diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 1dc0baa000..865e03d863 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -1100,6 +1100,11 @@ mockito-core 2.23.4 + + org.mockito + mockito-all + 1.8.5 + org.objenesis objenesis diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000000..e4e59d9377 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/dev-support/findbugs-exclude.xml @@ -0,0 +1,17 @@ + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000000..e4e59d9377 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/dev-support/findbugs-exclude.xml @@ -0,0 +1,17 @@ + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml new file mode 100644 index 0000000000..02e0fdce70 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/pom.xml @@ -0,0 +1,173 @@ + + + + + hadoop-applications-mawo + org.apache.hadoop.applications.mawo + 3.3.0-SNAPSHOT + + 4.0.0 + + + hadoop-applications-mawo-core + jar + + Apache Hadoop YARN Application MaWo Core + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + test + + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop + hadoop-common + test-jar + test + + + + com.google.inject + guice + + + + org.apache.curator + curator-framework + + + + org.apache.curator + curator-client + + + + org.apache.curator + curator-test + test + + + + commons-io + commons-io + + + + commons-cli + commons-cli + + + + org.apache.zookeeper + zookeeper + + + + junit + junit + + + com.sun.jdmk + jmxtools + + + com.sun.jmx + jmxri + + + org.jboss.netty + netty + + + + + + org.slf4j + slf4j-api + + + + com.googlecode.json-simple + json-simple + + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + + org.mockito + mockito-all + test + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + src/assembly/bin.xml + + hadoop-applications-mawo-core-${project.version} + + + + package + + single + + + + + + org.apache.rat + apache-rat-plugin + + + target/**/* + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/assembly/bin.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/assembly/bin.xml new file mode 100644 index 0000000000..6f2d9a82dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/assembly/bin.xml @@ -0,0 +1,41 @@ + + + + bin + + tar.gz + + + + + README* + + + + + + src/main/bin + bin + + + + + target + / + + *.jar + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java new file mode 100644 index 0000000000..f27c4ee780 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Abstract class for MaWo Task. + */ +public abstract class AbstractTask implements Task { + /** + * Task identifier. + */ + private TaskId taskID = new TaskId(); + /** + * Task environment. + */ + private Map environment = new HashMap(); + /** + * Command which need to be executed as Task. + */ + private String taskCmd; + /** + * Type of task. + */ + private TaskType taskType; + /** + * Task timeout. + */ + private long timeout; + /** + * logger for abstract class. + */ + static final Logger LOG = LoggerFactory.getLogger(AbstractTask.class); + + /** + * AbstractTask constructor. + */ + public AbstractTask() { + } + + /** + * AbstrackTask constructor. + * @param taskId : Task identifier + * @param localenvironment : Task environment vars + * @param taskCMD : Cmd to run + * @param localtimeout : Task timeout in seconds + */ + + public AbstractTask(final TaskId taskId, + final Map localenvironment, + final String taskCMD, final long localtimeout) { + this(); + setTaskId(taskId); + setEnvironment(localenvironment); + setTaskCmd(taskCMD); + setTimeout(localtimeout); + LOG.info("Created Task - type: " + this.taskType + ", TaskId: " + + this.taskID.toString() + ", cmd: '" + taskCMD + "' Timeout: " + + timeout); + } + + /** + * Get environment for a Task. + * @return environment of a Task + */ + @Override + public final Map getEnvironment() { + return environment; + } + + /** + * Set environment for a Task. + * @param localenvironment : Map of environment vars + */ + @Override + public final void setEnvironment(final Map localenvironment) { + this.environment = localenvironment; + } + + /** + * Get TaskCmd for a Task. + * @return TaskCMD: Its a task command line such as sleep 10 + */ + @Override + public final String getTaskCmd() { + return taskCmd; + } + + /** + * Set TaskCmd for a Task. + * @param taskCMD : Task command line + */ + @Override + public final void setTaskCmd(final String taskCMD) { + this.taskCmd = taskCMD; + } + + /** + * Get TaskId for a Task. + * @return TaskID: Task command line + */ + @Override + public final TaskId getTaskId() { + return taskID; + } + + /** + * Set Task Id. + * @param taskId : Task Identifier + */ + @Override + public final void setTaskId(final TaskId taskId) { + if (taskId != null) { + this.taskID = taskId; + } + } + + /** + * Get TaskType for a Task. + * @return TaskType: Type of Task + */ + @Override + public final TaskType getTaskType() { + return taskType; + } + + /** + * Set TaskType for a Task. + * @param type Simple or Composite Task + */ + public final void setTaskType(final TaskType type) { + this.taskType = type; + } + + /** + * Get Timeout for a Task. + * @return timeout in seconds + */ + @Override + public final long getTimeout() { + return this.timeout; + } + + /** + * Set Task Timeout in seconds. + * @param taskTimeout : Timeout in seconds + */ + @Override + public final void setTimeout(final long taskTimeout) { + this.timeout = taskTimeout; + } + + /** + * Write Task. + * @param out : dataoutout object. + * @throws IOException : Throws IO exception if any error occurs. + */ + @Override + public final void write(final DataOutput out) throws IOException { + taskID.write(out); + int environmentSize = 0; + if (environment == null) { + environmentSize = 0; + } else { + environmentSize = environment.size(); + } + new IntWritable(environmentSize).write(out); + if (environmentSize != 0) { + for (Entry envEntry : environment.entrySet()) { + new Text(envEntry.getKey()).write(out); + new Text(envEntry.getValue()).write(out); + } + } + Text taskCmdText; + if (taskCmd == null) { + taskCmdText = new Text(""); + } else { + taskCmdText = new Text(taskCmd); + } + taskCmdText.write(out); + WritableUtils.writeEnum(out, taskType); + WritableUtils.writeVLong(out, timeout); + } + + /** + * Read Fields from file. + * @param in : datainput object. + * @throws IOException : Throws IOException in case of error. + */ + @Override + public final void readFields(final DataInput in) throws IOException { + this.taskID = new TaskId(); + taskID.readFields(in); + IntWritable envSize = new IntWritable(0); + envSize.readFields(in); + for (int i = 0; i < envSize.get(); i++) { + Text key = new Text(); + Text value = new Text(); + key.readFields(in); + value.readFields(in); + environment.put(key.toString(), value.toString()); + } + Text taskCmdText = new Text(); + taskCmdText.readFields(in); + taskCmd = taskCmdText.toString(); + taskType = WritableUtils.readEnum(in, TaskType.class); + timeout = WritableUtils.readVLong(in); + } + + /** + * ToString. + * @return String representation of Task + */ + @Override + public final String toString() { + return "TaskId: " + this.taskID.toString() + ", TaskType: " + this.taskType + + ", cmd: '" + taskCmd + "'"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java new file mode 100644 index 0000000000..05c17f8743 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +/** + * Composite Task is Task with multiple commands. + */ +public class CompositeTask extends AbstractTask { + /** + * Composite Task initializer. + */ + public CompositeTask() { + super(); + setTaskType(TaskType.COMPOSITE); + } + + /** + * Composite initializer for specific task. + * @param task : Task object + */ + public CompositeTask(final Task task) { + super(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(), + task.getTimeout()); + this.setTaskType(TaskType.COMPOSITE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java new file mode 100644 index 0000000000..6ac778fe70 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +/** + * Die Task is a type of task which indicates app to die. + */ +public class DieTask extends AbstractTask { + /** + * Die Task constructor. + */ + public DieTask() { + super(); + setTaskType(TaskType.DIE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java new file mode 100644 index 0000000000..a019913ccd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java @@ -0,0 +1,502 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Singleton; + +/** + * MaWo configuration class. + */ +@Singleton +public final class MawoConfiguration { + /** + * logger initialization for mawo config class. + */ + static final Logger LOG = LoggerFactory.getLogger(MawoConfiguration.class); + /** + * Define comma separator. + */ + static final String COMMA_SPLITTER = ","; + /** + * MaWo config file name. + */ + public static final String CONFIG_FILE = "mawo.properties"; + + /** + * RPC server hostname. + */ + private static final String RPC_SERVER_HOSTNAME = "rpc.server.hostname"; + /** + * RPC server port. + */ + private static final String RPC_SERVER_PORT = "rpc.server.port"; + + // Default values + /** + * RPC server hostname default value. + */ + private static final String RPC_SERVER_HOSTNAME_DEFAULT = "localhost"; + /** + * RPC server port default value. + */ + private static final String RPC_SERVER_PORT_DEFAULT = "5121"; + + // Curator related Configurations + /** + * Config to check id Job Queue Storage is enabled. + */ + private static final String JOB_QUEUE_STORAGE_ENABLED = + "mawo.job-queue-storage.enabled"; + + /** + * ZooKeeper property prefix. + */ + private static final String ZK_PREFIX = "zookeeper."; + /** + * Property for ZooKeeper address. + */ + private static final String ZK_ADDRESS = ZK_PREFIX + "address"; + /** + * Default value for ZooKeeper address. + */ + private static final String ZK_ADDRESS_DEFAULT = "localhost:2181"; + + /** + * Property for ZooKeeper parent path. + */ + private static final String ZK_PARENT_PATH = ZK_PREFIX + "parent.path"; + /** + * Property for ZooKeeper parent path default value. + */ + private static final String ZK_PARENT_PATH_DEFAULT = "/mawoRoot"; + + /** + * Property for ZooKeeper retry interval. + */ + private static final String ZK_RETRY_INTERVAL_MS = + ZK_PREFIX + "retry.interval.ms"; + /** + * Default value for ZooKeeper retry interval. + */ + private static final String ZK_RETRY_INTERVAL_MS_DEFAULT = "1000"; + + /** + * Property for Zookeeper session timeout. + */ + private static final String ZK_SESSION_TIMEOUT_MS = + ZK_PREFIX + "session.timeout.ms"; + /** + * Default value for ZooKeeper session timeout. + */ + private static final String ZK_SESSION_TIMEOUT_MS_DEFAULT = "10000"; + + /** + * Property for ZooKeeper retry number. + */ + private static final String ZK_RETRIES_NUM = ZK_PREFIX + "retries.num"; + /** + * Default value for ZooKeeper retry number. + */ + private static final String ZK_RETRIES_NUM_DEFAULT = "1000"; + + /** + * Property for ZooKeeper acl. + */ + private static final String ZK_ACL = ZK_PREFIX + "acl"; + /** + * Default value for ZooKeeper acl. + */ + private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda"; + + /** + * Property for setting num of workers. + */ + private static final String WORKER_NUM_TASKS = "worker.num.tasks"; + /** + * Default value for num of workers. + */ + private static final String WORKER_NUM_TASKS_DEFAULT = "10"; + + /** + * Property for setting job builder class. + */ + public static final String JOB_BUILDER_CLASS = "mawo.job-builder.class"; + /** + * Default value for job builder class = simpleTaskJobBuilder. + */ + private static final String JOB_BUILDER_CLASS_DEFAULT = + "org.apache.hadoop.applications.mawo.server.master.job." + + "SimpleTaskJobBuilder"; + + /** + * Property for setting worker workspace. + */ + private static final String WORKER_WORK_SPACE = "worker.workspace"; + /** + * Default value for worker workspace. + */ + private static final String WORKER_WORK_SPACE_DEFAULT = "/tmp"; + + /** + * Property for resource manager url. + */ + public static final String CLUSTER_MANAGER_URL = "ycloud.url"; + /** + * Default value for resource manager url. + */ + private static final String DEFAULT_CLUSTER_MANAGER_URL = "0.0.0.0:9191"; + + /** + * Property for setting auto shutdown for worker. + */ + public static final String AUTO_SHUTDOWN_WORKERS = + "mawo.master.auto-shutdown-workers"; + /** + * Set auto shutdown of workers to False by default. + */ + private static final boolean DEFAULT_AUTO_SHUTDOWN_WORKERS = false; + + /** + * Property for task status log path in master node. + */ + public static final String MASTER_TASKS_STATUS_LOG_PATH + = "master.tasks-status.log.path"; + /** + * Default value for task status log path. + */ + private static final String MASTER_TASKS_STATUS_LOG_PATH_DEFAULT = "/tmp"; + + /** + * Property for drain event timeout. + */ + private static final String MASTER_DRAIN_EVENTS_TIMEOUT = + "master.drain-events.timeout"; + /** + * Default value for drain event timeout. + */ + private static final long MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 60000; + + /** + * Property for worker white list env. + * This environment variables will be set for all tasks. + */ + private static final String WORKER_WHITELIST_ENV = "worker.whitelist.env"; + /** + * Default value for worker white list env. + */ + private static final String WORKER_WHITELIST_ENV_DEFAULT = ""; + + /** + * Property for teardown worker validity. + */ + private static final String MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS = + "master.teardown-worker.validity-interval.ms"; + /** + * Default value for teardown worker validity. + */ + private static final String + MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT = "120000"; + + /** + * Map of MaWo Configs. + */ + private Map configsMap; + + /** + * Mowo configuration initializer. + */ + public MawoConfiguration() { + this(readConfigFile()); + } + + /** + * Set up MaWo properties. + * @param properties : Map of properties + */ + private MawoConfiguration(final Properties properties) { + + configsMap = new HashMap(); + + configsMap.put(RPC_SERVER_HOSTNAME, properties + .getProperty(RPC_SERVER_HOSTNAME, RPC_SERVER_HOSTNAME_DEFAULT)); + configsMap.put(RPC_SERVER_PORT, + properties.getProperty(RPC_SERVER_PORT, RPC_SERVER_PORT_DEFAULT)); + + configsMap.put(ZK_ADDRESS, + properties.getProperty(ZK_ADDRESS, ZK_ADDRESS_DEFAULT)); + configsMap.put(ZK_PARENT_PATH, + properties.getProperty(ZK_PARENT_PATH, ZK_PARENT_PATH_DEFAULT)); + configsMap.put(ZK_RETRY_INTERVAL_MS, properties + .getProperty(ZK_RETRY_INTERVAL_MS, ZK_RETRY_INTERVAL_MS_DEFAULT)); + configsMap.put(ZK_SESSION_TIMEOUT_MS, properties + .getProperty(ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS_DEFAULT)); + configsMap.put(ZK_RETRIES_NUM, + properties.getProperty(ZK_RETRIES_NUM, ZK_RETRIES_NUM_DEFAULT)); + configsMap.put(ZK_ACL, properties.getProperty(ZK_ACL, ZK_ACL_DEFAULT)); + + configsMap.put(JOB_BUILDER_CLASS, + properties.getProperty(JOB_BUILDER_CLASS, JOB_BUILDER_CLASS_DEFAULT)); + + configsMap.put(JOB_QUEUE_STORAGE_ENABLED, + properties.getProperty(JOB_QUEUE_STORAGE_ENABLED, "false")); + + configsMap.put(CLUSTER_MANAGER_URL, properties + .getProperty(CLUSTER_MANAGER_URL, DEFAULT_CLUSTER_MANAGER_URL)); + + configsMap.put(WORKER_NUM_TASKS, + properties.getProperty(WORKER_NUM_TASKS, WORKER_NUM_TASKS_DEFAULT)); + + configsMap.put(WORKER_WORK_SPACE, + properties.getProperty(WORKER_WORK_SPACE, WORKER_WORK_SPACE_DEFAULT)); + + configsMap.put(AUTO_SHUTDOWN_WORKERS, properties.getProperty( + AUTO_SHUTDOWN_WORKERS, String.valueOf(DEFAULT_AUTO_SHUTDOWN_WORKERS))); + + configsMap.put(MASTER_TASKS_STATUS_LOG_PATH, properties.getProperty( + MASTER_TASKS_STATUS_LOG_PATH, + String.valueOf(MASTER_TASKS_STATUS_LOG_PATH_DEFAULT))); + + configsMap.put(MASTER_DRAIN_EVENTS_TIMEOUT, + properties.getProperty(MASTER_DRAIN_EVENTS_TIMEOUT, + String.valueOf(MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT))); + + configsMap.put(WORKER_WHITELIST_ENV, properties.getProperty( + WORKER_WHITELIST_ENV, WORKER_WHITELIST_ENV_DEFAULT)); + + configsMap.put(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS, + properties.getProperty(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS, + MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT)); + + } + + /** + * Get MaWo config map. + * @return the config map for MaWo properties + */ + + public Map getConfigsMap() { + return configsMap; + } + + /** + * Find, read, and parse the configuration file. + * + * @return the properties that were found or empty if no file was found + */ + private static Properties readConfigFile() { + Properties properties = new Properties(); + + // Get property file stream from classpath + LOG.info("Configuration file being loaded: " + CONFIG_FILE + + ". Found in classpath at " + + MawoConfiguration.class.getClassLoader().getResource(CONFIG_FILE)); + InputStream inputStream = MawoConfiguration.class.getClassLoader() + .getResourceAsStream(CONFIG_FILE); + + if (inputStream == null) { + throw new RuntimeException(CONFIG_FILE + " not found in classpath"); + } + + // load the properties + try { + properties.load(inputStream); + inputStream.close(); + } catch (FileNotFoundException fnf) { + LOG.error( + "No configuration file " + CONFIG_FILE + " found in classpath."); + } catch (IOException ie) { + throw new IllegalArgumentException( + "Can't read configuration file " + CONFIG_FILE, ie); + } + + return properties; + } + + /** + * Get MaWo RPC server Port. + * @return value of rpc.server.port + */ + public int getRpcServerPort() { + return Integer.parseInt(configsMap.get(RPC_SERVER_PORT)); + } + + /** + * Get RPC Host map. + * @return value of rpc.server.hostname + */ + public String getRpcHostName() { + return configsMap.get(RPC_SERVER_HOSTNAME); + } + + /** + * Check if Job Queue Storage is Enabled. + * @return True if Job queue storage is enabled otherwise False + */ + public boolean getJobQueueStorageEnabled() { + return Boolean.parseBoolean(configsMap.get(JOB_QUEUE_STORAGE_ENABLED)); + } + + /** + * Get ZooKeeper Address. + * @return value of ZooKeeper.address + */ + public String getZKAddress() { + return configsMap.get(ZK_ADDRESS); + } + + /** + * Get ZooKeeper parent Path. + * @return value of ZooKeeper.parent.path + */ + public String getZKParentPath() { + return configsMap.get(ZK_PARENT_PATH); + } + + /** + * Get ZooKeeper retry interval value in milli seconds. + * @return value of ZooKeeper.retry.interval.ms + */ + public int getZKRetryIntervalMS() { + return Integer.parseInt(configsMap.get(ZK_RETRY_INTERVAL_MS)); + } + + /** + * Get ZooKeeper session timeout in milli seconds. + * @return value of ZooKeeper.session.timeout.ms + */ + public int getZKSessionTimeoutMS() { + return Integer.parseInt(configsMap.get(ZK_SESSION_TIMEOUT_MS)); + } + + /** + * Get ZooKeeper retries number. + * @return value of ZooKeeper.retries.num + */ + public int getZKRetriesNum() { + return Integer.parseInt(configsMap.get(ZK_RETRIES_NUM)); + } + + /** + * Get ZooKeeper Acls. + * @return value of ZooKeeper.acl + */ + public String getZKAcl() { + return configsMap.get(ZK_ACL); + } + + /** + * Get number of tasks a worker can run in parallel. + * @return value of worker.num.tasks + */ + public int getWorkerConcurrentTasksLimit() { + return Integer.parseInt(configsMap.get(WORKER_NUM_TASKS)); + } + + /** + * Get job builder class. + * @return value of mawo.job-builder.class + */ + public String getJobBuilderClass() { + return configsMap.get(JOB_BUILDER_CLASS); + } + + /** + * Get worker work space. + * @return value of worker.workspace + */ + public String getWorkerWorkSpace() { + return configsMap.get(WORKER_WORK_SPACE); + } + + /** + * Get cluster manager URL. + * @return value of ycloud.url + */ + public String getClusterManagerURL() { + return configsMap.get(CLUSTER_MANAGER_URL); + } + + /** + * Check if worker auto shutdown feature is enabled. + * @return value of mawo.master.auto-shutdown-workers + */ + public boolean getAutoShutdownWorkers() { + return Boolean.parseBoolean(configsMap.get(AUTO_SHUTDOWN_WORKERS)); + } + + /** + * Get Task status log file path on master host. + * @return value of master.tasks-status.log.path + */ + public String getMasterTasksStatusLogPath() { + return configsMap.get(MASTER_TASKS_STATUS_LOG_PATH); + } + + /** + * Get Master drain event timeout. + * @return value of master.drain-events.timeout + */ + public long getMasterDrainEventsTimeout() { + return Long.parseLong(configsMap.get(MASTER_DRAIN_EVENTS_TIMEOUT)); + } + + /** + * Get Worker whitelist env params. + * These params will be set in all tasks. + * @return list of white list environment + */ + public List getWorkerWhiteListEnv() { + List whiteList = new ArrayList(); + String env = configsMap.get(WORKER_WHITELIST_ENV); + if (env != null && !env.isEmpty()) { + String[] variables = env.split(COMMA_SPLITTER); + for (String variable : variables) { + variable = variable.trim(); + if (variable.startsWith("$")) { + variable = variable.substring(1); + } + if (!variable.isEmpty()) { + whiteList.add(variable); + } + } + } + return whiteList; + } + + /** + * Get Teardown worker validity interval. + * @return value of master.teardown-worker.validity-interval.ms + */ + public long getTeardownWorkerValidityInterval() { + return Long.parseLong(configsMap.get( + MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java new file mode 100644 index 0000000000..c934913108 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +/** + * Define Null Task. + */ +public class NullTask extends AbstractTask { + + /** + * Null Task initializer. + */ + public NullTask() { + super(); + this.setTaskType(TaskType.NULL); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java new file mode 100644 index 0000000000..1dd87a67bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.util.Map; + +/** + * Define Simple Task. + * Each Task has only one command + */ +public class SimpleTask extends AbstractTask { + /** + * Simple Task default initializer. + */ + public SimpleTask() { + super(); + this.setTaskType(TaskType.SIMPLE); + } + + /** + * Set up Simple Task with Task object. + * @param task : Task object + */ + public SimpleTask(final Task task) { + this(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(), + task.getTimeout()); + } + + /** + * Create Simple Task with Task details. + * @param taskId : task identifier + * @param environment : task environment + * @param taskCMD : task command + * @param timeout : task timeout + */ + public SimpleTask(final TaskId taskId, final Map environment, + final String taskCMD, final long timeout) { + super(taskId, environment, taskCMD, timeout); + this.setTaskType(TaskType.SIMPLE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java new file mode 100644 index 0000000000..e6b42ac9c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.util.Map; + +import org.apache.hadoop.io.Writable; + +/** + * Define Task Interface. + */ +public interface Task extends Writable { + + /** + * Get TaskId of a Task. + * @return value of TaskId + */ + TaskId getTaskId(); + + /** + * Get Environment of Task. + * @return map of environment + */ + Map getEnvironment(); + + /** + * Get Task cmd. + * @return value of Task cmd such "sleep 1" + */ + String getTaskCmd(); + + /** + * Get Task type such as Simple, Composite. + * @return value of TaskType + */ + TaskType getTaskType(); + + /** + * Set TaskId. + * @param taskId : Task identifier + */ + void setTaskId(TaskId taskId); + + /** + * Set Task environment such as {"HOME":"/user/A"}. + * @param environment : Map of environment variables + */ + void setEnvironment(Map environment); + + /** + * Set Task command. + * @param taskCMD : Task command to be executed + */ + void setTaskCmd(String taskCMD); + + /** + * Get Task Timeout in seconds. + * @return value of TaskTimeout + */ + long getTimeout(); + + /** + * Set Task Timeout. + * @param timeout : value of Task Timeout + */ + void setTimeout(long timeout); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java new file mode 100644 index 0000000000..19f8cbed90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.applications.mawo.server.master.job.JobId; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Defines TaskId for MaWo app. + */ +public class TaskId implements Writable { + + /** + * MaWo TaskIds prefix. + */ + static final String TASK_ID_PREFIX = "mawo_task_"; + + /** + * MaWo Job ID. + */ + private JobId jobId = new JobId(); + /** + * Mawo TaskId. + */ + private long taskId; + + /** + * TaskId constructor. + */ + public TaskId() { + } + + /** + * TaskId constructor with jobId and taskId. + * @param localjobId : Job identifier + * @param id : Task identifier + */ + public TaskId(final JobId localjobId, final int id) { + this.jobId = localjobId; + this.taskId = id; + } + + /** + * Getter method for jobId. + * @return JobID: Job identifier + */ + public final int getJobId() { + return jobId.getID(); + } + + /** + * Getter method for TaskID. + * @return TaskId: Task identifier + */ + public final long getId() { + return taskId; + } + + /** + * Print method for TaskId. + * @return : Full TaskId which is TaskId_prefix + jobId + _ + TaskId + */ + public final String toString() { + return TASK_ID_PREFIX + jobId.getID() + "_" + taskId; + } + + @Override + /** + * Hashcode method for TaskId. + */ + public final int hashCode() { + final int prime = 31; + final int bits = 32; + int result = 1; + int jobHash = 0; + if (jobId == null) { + jobHash = 0; + } else { + jobHash = jobId.hashCode(); + } + result = prime * result + jobHash; + result = prime * result + (int) (taskId ^ (taskId >>> bits)); + return result; + } + + @Override + /** + * Equal method override for TaskId. + */ + public final boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + TaskId other = (TaskId) obj; + if (jobId == null) { + if (other.jobId != null) { + return false; + } + } else if (!jobId.equals(other.jobId)) { + return false; + } + if (taskId != other.taskId) { + return false; + } + return true; + } + + /** {@inheritDoc} */ + public final void write(final DataOutput out) throws IOException { + jobId.write(out); + WritableUtils.writeVLong(out, taskId); + } + + /** {@inheritDoc} */ + public final void readFields(final DataInput in) throws IOException { + jobId = new JobId(); + jobId.readFields(in); + this.taskId = WritableUtils.readVLong(in); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java new file mode 100644 index 0000000000..4f780d8dc3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.applications.mawo.server.worker.WorkerId; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Defines TaskStatus for MaWo app. + */ +public class TaskStatus implements Writable, Cloneable { + + /** + * Set logger. + */ + private static final Logger LOG = LoggerFactory.getLogger(TaskStatus.class); + + /** + * TaskId is a unique task identifier. + */ + private TaskId taskId = new TaskId(); + /** + * epoch time for a task starttime. + */ + private long startTime; + /** + * epoch time for a task endtime. + */ + private long endTime; + /** + * Unique worker identifier. + */ + private WorkerId workerId = new WorkerId(); + /** + * Task exit code. + */ + private int exitCode = -1; + /** + * Task cmd. + */ + private String taskCMD; + /** + * Task type. + */ + private String taskType; + + /** + * Possible Task States. + */ + public enum State { + /** + * INIT State refers to Task initialization. + */ + INIT, + /** + * RUNNING State refers to Task in Running state. + */ + + RUNNING, + /** + * SUCCEEDED State is assigned when task finishes successfully. + */ + SUCCEEDED, + /** + * FAILED State is assigned when task fails. + */ + FAILED, + /** + * KILLED State refers to when a task is killed. + */ + KILLED, + /** + * EXPIRE State refers to when a task is expired. + */ + EXPIRE + } + + /** + * Current Task state. + */ + private volatile State runState; + + /** + * Task status constructor. + */ + public TaskStatus() { + } + + /** + * Task status constructor with workerId, TaskId, TaskCmd, TaskType. + * @param localworkerId : Worker ID + * @param localtaskId : Task ID + * @param localtaskCMD : Task command line + * @param localtaskType : Type of Task + */ + public TaskStatus(final WorkerId localworkerId, final TaskId localtaskId, + final String localtaskCMD, final String localtaskType) { + this(localworkerId, localtaskId, + TaskStatus.State.INIT, localtaskCMD, + localtaskType); + } + + /** + * Task status constructor with workerId, TaskId, + * TaskCmd, TaskType and Run State. + * @param localworkerId : Worker Id + * @param localtaskId : Task Id + * @param localrunState : Task run State + * @param localtaskCMD : Task cmd + * @param localtaskType : Task type + */ + public TaskStatus(final WorkerId localworkerId, final TaskId localtaskId, + final State localrunState, final String localtaskCMD, + final String localtaskType) { + setWorkerId(localworkerId); + setTaskId(localtaskId); + setRunState(localrunState); + setTaskCMD(localtaskCMD); + setTaskType(localtaskType); + } + + /** + * Get status of a Task. + * @return Status of a Task + */ + public final State getRunState() { + return runState; + } + + /** + * Update status of a Task. + * @param localrunState : Status of a Task + */ + public final void setRunState(final State localrunState) { + this.runState = localrunState; + } + + /** + * Set exitcode of a Task. + * @param localexitCode : Exitcode of a Task + */ + public final void setExitCode(final int localexitCode) { + this.exitCode = localexitCode; + } + + /** + * Get exitcode of a Task. + * @return exitCode of Task + */ + public final int getExitCode() { + return exitCode; + } + + /** + * Set Task cmd of a Task. + * @param localcmd : command line which need to be executed + */ + public final void setTaskCMD(final String localcmd) { + this.taskCMD = localcmd; + } + + /** + * Get Task cmd of a Task. + * @return TaskCmd : command line which need to be executed + */ + public final String getTaskCMD() { + return taskCMD; + } + + /** + * Set Task Type. + * @param localtaskType : TaskType such as SimpleTask, NullTask + */ + public final void setTaskType(final String localtaskType) { + this.taskType = localtaskType; + } + + /** + * Get Task Type. + * @return TaskType : TaskType such as SimpleTask, NullTask + */ + public final String getTaskType() { + return taskType; + } + + /** + * Get Task Id. + * @return TaskId : Task identifier + */ + public final TaskId getTaskId() { + return taskId; + } + + /** + * Set TaskId. + * @param localtaskId : Task identifier + */ + public final void setTaskId(final TaskId localtaskId) { + if (localtaskId != null) { + this.taskId = localtaskId; + } + } + + /** + * Set staus of a Task. + * @param localtaskId : TaskId of a task + * @param localrunState : Run state of a task + */ + public final void setTaskState(final TaskId localtaskId, + final State localrunState) { + setTaskId(localtaskId); + setRunState(localrunState); + } + + /** + * Get Task status of a Task. + * @param localtaskId : Task Id + * @return TaskStatus for valid Task otherwise Null + */ + public final State getTaskState(final TaskId localtaskId) { + if (localtaskId.equals(this.taskId)) { + return getRunState(); + } else { + return null; + } + } + + /** + * Get starttime of a Task. + * @return StartTime of Task + */ + public final long getStartTime() { + return startTime; + } + + /** + * Set current time as start time of a Task. + */ + public final void setStartTime() { + this.startTime = getCurrentTime(); + LOG.debug("Start Time for " + this.taskId + " is " + this.startTime); + } + + /** + * Set task start time to a specific time value. + * @param time : epoch timestamp + */ + private void setStartTime(final long time) { + this.startTime = time; + } + + /** + * Get task end time. + * @return End time of task. + */ + public final long getEndTime() { + return endTime; + } + + /** + * Set task end time to current time. + */ + public final void setEndTime() { + this.setEndTime(getCurrentTime()); + } + + /** + * Set task end time to a specific time value. + * @param time : epoch timestamp + */ + private void setEndTime(final long time) { + this.endTime = time; + LOG.debug("End Time for " + this.taskId + " is " + this.endTime); + } + + /** + * Get current time in milliseconds. + * @return Current time in milliseconds + */ + private long getCurrentTime() { + return System.currentTimeMillis(); + } + + /** {@inheritDoc} */ + public final void write(final DataOutput dataOutput) throws IOException { + workerId.write(dataOutput); + taskId.write(dataOutput); + WritableUtils.writeEnum(dataOutput, runState); + WritableUtils.writeVLong(dataOutput, getStartTime()); + WritableUtils.writeVLong(dataOutput, getEndTime()); + WritableUtils.writeString(dataOutput, taskCMD); + WritableUtils.writeString(dataOutput, taskType); + WritableUtils.writeVInt(dataOutput, exitCode); + } + + + /** {@inheritDoc} */ + public final void readFields(final DataInput dataInput) throws IOException { + workerId.readFields(dataInput); + taskId.readFields(dataInput); + setRunState(WritableUtils.readEnum(dataInput, State.class)); + setStartTime(WritableUtils.readVLong(dataInput)); + setEndTime(WritableUtils.readVLong(dataInput)); + setTaskCMD(WritableUtils.readString(dataInput)); + setTaskType(WritableUtils.readString(dataInput)); + setExitCode(WritableUtils.readVInt(dataInput)); + } + + /** + * Get workerId. + * @return workerId : Worker identifier + */ + public final WorkerId getWorkerId() { + return workerId; + } + + /** + * Set WorkerId. + * @param localworkerId : Worker identifier + */ + public final void setWorkerId(final WorkerId localworkerId) { + this.workerId = localworkerId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java new file mode 100644 index 0000000000..b7f22ee832 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +/** + * Define MaWo Task Type. + */ +public enum TaskType { + /** + * Its a Simple Task. + */ + SIMPLE, + /** + * Its a composite task which consists of multiple simple tasks. + */ + COMPOSITE, + /** + * Its a null task. + */ + NULL, + /** + * Die Task is to signal for suicide. + */ + DIE, + /** + * Teardown Task is a task which runs after all tasks are finished. + */ + TEARDOWN +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java new file mode 100644 index 0000000000..959e2cb468 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import java.util.Map; + +/** + * Define Teardown Task. + */ +public class TeardownTask extends SimpleTask { + /** + * Teardown task default constructor. + */ + public TeardownTask() { + super(); + setTaskType(TaskType.TEARDOWN); + } + + /** + * Teardown Task constructor. + * @param taskId : Teardown task Id + * @param environment : Environment map for teardown task + * @param taskCMD : Teardown task command + * @param timeout : Timeout for Teardown task + */ + public TeardownTask(final TaskId taskId, + final Map environment, + final String taskCMD, final long timeout) { + super(taskId, environment, taskCMD, timeout); + setTaskType(TaskType.TEARDOWN); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java new file mode 100644 index 0000000000..39cb283a4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + +import org.apache.hadoop.applications.mawo.server.worker.WorkerId; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolInfo; + +/** + * Define work assignment protocol. + */ +@ProtocolInfo(protocolName = "WorkAssignmentProtocol", protocolVersion = 1) +public interface WorkAssignmentProtocol { + + /** + * Get next workerId to which new task will be assigned. + * @return return workerId text + */ + Text getNewWorkerId(); + + /** + * Register Worker. + * When worker will be launched first, it needs to be registered with Master. + * @param workerId : Worker Id + * @return Task instance + */ + Task registerWorker(WorkerId workerId); + + /** + * De Register worker. + * When worker is de-registered, no new task will be assigned to this worker. + * @param workerId : Worker identifier + */ + void deRegisterWorker(WorkerId workerId); + + /** + * Worker sends heartbeat to Master. + * @param workerId : Worker Id + * @param taskStatusList : TaskStatus list of all tasks assigned to worker. + * @return Task instance + */ + Task sendHeartbeat(WorkerId workerId, TaskStatus[] taskStatusList); + + /** + * Add Task to the list. + * @param task : Task object + */ + void addTask(Task task); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/package-info.java new file mode 100644 index 0000000000..f00c547a25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/common/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.applications.mawo.server.common; +/** + * Helper classes for the mawo server common operations. + */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobId.java new file mode 100644 index 0000000000..f056f5787f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobId.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Define MaWo JobId. + */ +public class JobId implements Writable { + + /** + * MaWo job prefix. + */ + private static final String JOB_PREFIX = "mawo_job_"; + + /** + * Create unique random JobId. + * @return unique random JobId + */ + static JobId newJobId() { + Random rn = new Random(); + final int range = 900000; + final int randomadd = 100000; + int randomNum = rn.nextInt(range) + randomadd; + return new JobId(randomNum); + } + + /** + * Unique Id. + */ + private int jobIdentifier; + + /** + * JobId default constructor. + */ + public JobId() { + + } + + /** + * JobId constructor with Id. + * @param id : unique id + */ + public JobId(final int id) { + this.jobIdentifier = id; + } + + /** + * Get JobId. + * @return unique ID + */ + public final int getID() { + return jobIdentifier; + } + + /** + * Print JobId. + * @return JobId + */ + public final String toString() { + return JOB_PREFIX + jobIdentifier; + } + + @Override + /** + * Hashcode for jobId. + */ + public final int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + jobIdentifier; + return result; + } + + @Override + /** + * Implement equals method for jobId. + */ + public final boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JobId other = (JobId) obj; + if (jobIdentifier != other.jobIdentifier) { + return false; + } + return true; + } + + /** {@inheritDoc} */ + public final void write(final DataOutput out) throws IOException { + WritableUtils.writeVInt(out, jobIdentifier); + } + + /** {@inheritDoc} */ + public final void readFields(final DataInput in) throws IOException { + this.jobIdentifier = WritableUtils.readVInt(in); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/package-info.java new file mode 100644 index 0000000000..c9453805d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.applications.mawo.server.master.job; +/** + * Helper classes for the mawo master job. + */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/WorkerId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/WorkerId.java new file mode 100644 index 0000000000..dfb356b105 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/WorkerId.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.worker; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Define WorkerId for Workers. + */ +public class WorkerId implements Writable { + /** + * WorkerId is a unique identifier for workers. + */ + private Text workerId = new Text(); + /** + * Hostname of worker node. + */ + private Text hostname = new Text(); + /** + * Ip address of worker node. + */ + private Text ipAdd = new Text(); + + /** + * Default constructor for workerId. + * Set Hostname and Ip address of the machine where worker is running. + */ + public WorkerId() { + try { + this.hostname = + new Text(InetAddress.getLocalHost().getHostName()); + this.ipAdd = + new Text(InetAddress.getLocalHost().getHostAddress().toString()); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + /** + * Get hostname for Worker. + * @return hostname of worker node + */ + public final Text getHostname() { + return hostname; + } + + /** + * Set hostname for Worker. + * @param wkhostname : Hostname of worker + */ + public final void setHostname(final Text wkhostname) { + this.hostname = wkhostname; + } + + /** + * Get Worker IP address. + * @return IP address of worker node + */ + public final String getIPAddress() { + return this.ipAdd.toString(); + } + + /** + * Print workerId. + * @return workeId in string + */ + public final String toString() { + return workerId.toString(); + } + + /** + * Get workerId. + * @return workerId : Worker identifier + */ + public final String getWorkerId() { + return this.workerId.toString(); + } + + /** + * Set workerId. + * @param localworkerId : Worker identifier + */ + public final void setWorkerId(final String localworkerId) { + this.workerId = new Text(localworkerId); + } + + @Override + /** + * Implememt equals method for WorkerId. + */ + public final boolean equals(final Object o) { + WorkerId x = (WorkerId) o; + return x.getHostname().equals(this.hostname); + } + + /** {@inheritDoc} */ + public final void write(final DataOutput dataOutput) throws IOException { + workerId.write(dataOutput); + hostname.write(dataOutput); + ipAdd.write(dataOutput); + } + + /** {@inheritDoc} */ + public final void readFields(final DataInput dataInput) throws IOException { + workerId.readFields(dataInput); + hostname.readFields(dataInput); + ipAdd.readFields(dataInput); + } + + @Override + /** + * Override hashcode method for WorkerId. + */ + public final int hashCode() { + final int prime = 31; + int result = 1; + int workerHash = 0; + if (workerId == null) { + workerHash = 0; + } else { + workerHash = workerId.hashCode(); + } + int hostHash = 0; + if (hostname == null) { + hostHash = 0; + } else { + hostHash = hostname.hashCode(); + } + int ipHash = 0; + if (ipAdd == null) { + ipHash = 0; + } else { + ipHash = ipAdd.hashCode(); + } + result = prime * result + workerHash; + result = prime * result + hostHash; + result = prime * result + ipHash; + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/package-info.java new file mode 100644 index 0000000000..7fa0d55b6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/java/org/apache/hadoop/applications/mawo/server/worker/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.applications.mawo.server.worker; +/** + * Helper classes for the mawo worker. + */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/resources/mawo-default.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/resources/mawo-default.properties new file mode 100644 index 0000000000..5ebe59eaa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/main/resources/mawo-default.properties @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rpc.server.hostname=localhost +rpc.server.port=5120 + +#curator related configurations +zookeeper.parent.path=/mawoRoot +zookeeper.address=localhost:2181 +zookeeper.retry.interval.ms=1000 +zookeeper.session.timeout.ms=10000 +zookeeper.retries.num=1000 +zookeeper.acl=world:anyone:rwcda +worker.num.tasks=10 +mawo.job-queue-storage.enabled=true +mawo.job-builder.class=org.apache.hadoop.applications.mawo.server.master.job.SimpleTaskJsonJobBuilder +worker.workspace=/tmp + +ycloud.url=0.0.0.0:9191 + +task.block-size.min=10 +task.uber-time.min=12 + +master.tasks-status.log.path=/tmp + +master.teardown-worker.validity-interval.ms=120000 +#a comma-separated list of environment variables needed by all the tasks +worker.whitelist.env= diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java new file mode 100644 index 0000000000..e189bcb8f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.applications.mawo.server.common; + + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test MaWo configuration. + */ +public class TestMaWoConfiguration { + + /** + * Validate default MaWo Configurations. + */ + @Test + public void testMaWoConfiguration() { + + MawoConfiguration mawoConf = new MawoConfiguration(); + + // validate Rpc server port + Assert.assertEquals(mawoConf.getRpcServerPort(), 5120); + + // validate Rpc hostname + Assert.assertTrue("localhost".equals(mawoConf.getRpcHostName())); + + // validate job queue storage conf + boolean jobQueueStorage = mawoConf.getJobQueueStorageEnabled(); + Assert.assertTrue(jobQueueStorage); + + // validate default teardownWorkerValidity Interval + Assert.assertEquals(mawoConf.getTeardownWorkerValidityInterval(), 120000); + + // validate Zk related configs + Assert.assertTrue("/tmp/mawoRoot".equals(mawoConf.getZKParentPath())); + Assert.assertTrue("localhost:2181".equals(mawoConf.getZKAddress())); + Assert.assertEquals(1000, mawoConf.getZKRetryIntervalMS()); + Assert.assertEquals(10000, mawoConf.getZKSessionTimeoutMS()); + Assert.assertEquals(1000, mawoConf.getZKRetriesNum()); + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/resources/mawo.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/resources/mawo.properties new file mode 100644 index 0000000000..0d5ea31589 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/hadoop-yarn-applications-mawo-core/src/test/resources/mawo.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rpc.server.hostname=localhost +rpc.server.port=5120 + +#curator related configurations +zookeeper.parent.path=/tmp/mawoRoot +zookeeper.address=localhost:2181 +zookeeper.retry.interval.ms=1000 +zookeeper.session.timeout.ms=10000 +zookeeper.retries.num=1000 +zookeeper.acl=world:anyone:rwcda +worker.num.tasks=10 +mawo.job-queue-storage.enabled=true diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/pom.xml new file mode 100644 index 0000000000..4b710226c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/pom.xml @@ -0,0 +1,37 @@ + + + + + hadoop-yarn-applications + org.apache.hadoop + 3.3.0-SNAPSHOT + + 4.0.0 + + org.apache.hadoop.applications.mawo + hadoop-applications-mawo + pom + + Apache Hadoop YARN Application MaWo + http://maven.apache.org + + + UTF-8 + + + + hadoop-yarn-applications-mawo-core + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml index 78b709a45a..17a5b495fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/pom.xml @@ -38,6 +38,7 @@ hadoop-yarn-applications-unmanaged-am-launcher hadoop-yarn-services hadoop-yarn-applications-catalog + hadoop-yarn-applications-mawo diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/MasterWorker.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/MasterWorker.md new file mode 100644 index 0000000000..ca49d30266 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/MasterWorker.md @@ -0,0 +1,36 @@ + + +# MaWo: A Master Worker framework on YARN Services + +## Overview + +MaWo is a YARN service based framework which handles Master Worker based workload. +This is an app which can take an input job specification with tasks, their expected durations and have a Master dish the tasks off to a predetermined set of workers. +The components will be responsible to finish the job within specific time duration. + +## MaWo Components + +MaWo app is a YARN Service Application. It has mainly two components. + +* Master + - Read MaWo-Payload file and create a queue of Tasks + - Register Worker + - Assign tasks to worker nodes + - Monitor status of Tasks + - Log Task status + +* Worker + - Send heartbeat to Worker + - Execute Task \ No newline at end of file