diff --git a/CHANGES.txt b/CHANGES.txt index 66f44ecbe5..68c2b729a3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,8 @@ Trunk (unreleased changes) and the init of the class is made to take a Configuration argument. (Jakob Homan via ddas) + HADOOP-6108. Add support for EBS storage on EC2. (tomwhite) + IMPROVEMENTS HADOOP-6283. Improve the exception messages thrown by diff --git a/build.xml b/build.xml index 8f49ab6b38..61470d2177 100644 --- a/build.xml +++ b/build.xml @@ -1200,6 +1200,7 @@ + diff --git a/src/contrib/cloud/README.txt b/src/contrib/cloud/README.txt new file mode 100644 index 0000000000..76f2a72b9b --- /dev/null +++ b/src/contrib/cloud/README.txt @@ -0,0 +1,307 @@ +Hadoop Cloud Scripts +==================== + +These scripts allow you to run Hadoop on cloud providers. Currently only Amazon +EC2 is supported, but more providers are expected to be added over time. + +Getting Started +=============== + +First, unpack the scripts on your system. For convenience, you may like to put +the top-level directory on your path. + +You'll also need python (version 2.5 or newer) and the boto and simplejson +libraries. After you download boto and simplejson, you can install each in turn +by running the following in the directory where you unpacked the distribution: + +% sudo python setup.py install + +Alternatively, you might like to use the python-boto and python-simplejson RPM +and Debian packages. + +You need to tell the scripts your AWS credentials. The simplest way to do this +is to set the environment variables (but see +http://code.google.com/p/boto/wiki/BotoConfig for other options): + + * AWS_ACCESS_KEY_ID - Your AWS Access Key ID + * AWS_SECRET_ACCESS_KEY - Your AWS Secret Access Key + +To configure the scripts, create a directory called .hadoop-cloud (note the +leading ".") in your home directory. In it, create a file called +clusters.cfg with a section for each cluster you want to control. e.g.: + +[my-hadoop-cluster] +image_id=ami-6159bf08 +instance_type=c1.medium +key_name=tom +availability_zone=us-east-1c +private_key=PATH_TO_PRIVATE_KEY +ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no + +The image chosen here is one with a i386 Fedora OS. For a list of suitable AMIs +see http://wiki.apache.org/hadoop/AmazonEC2. + +The architecture must be compatible with the instance type. For m1.small and +c1.medium instances use the i386 AMIs, while for m1.large, m1.xlarge, and +c1.xlarge instances use the x86_64 AMIs. One of the high CPU instances +(c1.medium or c1.xlarge) is recommended. + +Then you can run the hadoop-ec2 script. It will display usage instructions when +invoked without arguments. + +You can test that it can connect to AWS by typing: + +% hadoop-ec2 list + +LAUNCHING A CLUSTER +=================== + +To launch a cluster called "my-hadoop-cluster" with 10 worker (slave) nodes +type: + +% hadoop-ec2 launch-cluster my-hadoop-cluster 10 + +This will boot the master node and 10 worker nodes. When the nodes have started +and the Hadoop cluster has come up, the console will display a message like + + Browse the cluster at http://ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com/ + +You can access Hadoop's web UI by visiting this URL. By default, port 80 is +opened for access from your client machine. You may change the firewall settings +(to allow access from a network, rather than just a single machine, for example) +by using the Amazon EC2 command line tools, or by using a tool like Elastic Fox. +The security group to change is the one named -master. + +For security reasons, traffic from the network your client is running on is +proxied through the master node of the cluster using an SSH tunnel (a SOCKS +proxy on port 6666). To set up the proxy run the following command: + +% hadoop-ec2 proxy my-hadoop-cluster + +Web browsers need to be configured to use this proxy too, so you can view pages +served by worker nodes in the cluster. The most convenient way to do this is to +use a proxy auto-config (PAC) file, such as this one: + + http://apache-hadoop-ec2.s3.amazonaws.com/proxy.pac + +If you are using Firefox, then you may find +FoxyProxy useful for managing PAC files. (If you use FoxyProxy, then you need to +get it to use the proxy for DNS lookups. To do this, go to Tools -> FoxyProxy -> +Options, and then under "Miscellaneous" in the bottom left, choose "Use SOCKS +proxy for DNS lookups".) + +PERSISTENT CLUSTERS +=================== + +Hadoop clusters running on EC2 that use local EC2 storage (the default) will not +retain data once the cluster has been terminated. It is possible to use EBS for +persistent data, which allows a cluster to be shut down while it is not being +used. + +Note: EBS support is a Beta feature. + +First create a new section called "my-ebs-cluster" in the +.hadoop-cloud/clusters.cfg file. + +Now we need to create storage for the new cluster. Create a temporary EBS volume +of size 100GiB, format it, and save it as a snapshot in S3. This way, we only +have to do the formatting once. + +% hadoop-ec2 create-formatted-snapshot my-ebs-cluster 100 + +We create storage for a single master and for two slaves. The volumes to create +are described in a JSON spec file, which references the snapshot we just +created. Here is the contents of a JSON file, called +my-ebs-cluster-storage-spec.json: + +{ + "master": [ + { + "device": "/dev/sdj", + "mount_point": "/ebs1", + "size_gb": "100", + "snapshot_id": "snap-268e704f" + }, + { + "device": "/dev/sdk", + "mount_point": "/ebs2", + "size_gb": "100", + "snapshot_id": "snap-268e704f" + } + ], + "slave": [ + { + "device": "/dev/sdj", + "mount_point": "/ebs1", + "size_gb": "100", + "snapshot_id": "snap-268e704f" + }, + { + "device": "/dev/sdk", + "mount_point": "/ebs2", + "size_gb": "100", + "snapshot_id": "snap-268e704f" + } + ] +} + + +Each role (here "master" and "slave") is the key to an array of volume +specifications. In this example, the "slave" role has two devices ("/dev/sdj" +and "/dev/sdk") with different mount points, sizes, and generated from an EBS +snapshot. The snapshot is the formatted snapshot created earlier, so that the +volumes we create are pre-formatted. The size of the drives must match the size +of the snapshot created earlier. + +Let's create actual volumes using this file. + +% hadoop-ec2 create-storage my-ebs-cluster master 1 \ + my-ebs-cluster-storage-spec.json +% hadoop-ec2 create-storage my-ebs-cluster slave 2 \ + my-ebs-cluster-storage-spec.json + +Now let's start the cluster with 2 slave nodes: + +% hadoop-ec2 launch-cluster my-ebs-cluster 2 + +Login and run a job which creates some output. + +% hadoop-ec2 login my-ebs-cluster + +# hadoop fs -mkdir input +# hadoop fs -put /etc/hadoop/conf/*.xml input +# hadoop jar /usr/lib/hadoop/hadoop-*-examples.jar grep input output \ + 'dfs[a-z.]+' + +Look at the output: + +# hadoop fs -cat output/part-00000 | head + +Now let's shutdown the cluster. + +% hadoop-ec2 terminate-cluster my-ebs-cluster + +A little while later we restart the cluster and login. + +% hadoop-ec2 launch-cluster my-ebs-cluster 2 +% hadoop-ec2 login my-ebs-cluster + +The output from the job we ran before should still be there: + +# hadoop fs -cat output/part-00000 | head + +RUNNING JOBS +============ + +When you launched the cluster, a hadoop-site.xml file was created in the +directory ~/.hadoop-cloud/. You can use this to connect to the +cluster by setting the HADOOP_CONF_DIR enviroment variable (it is also possible +to set the configuration file to use by passing it as a -conf option to Hadoop +Tools): + +% export HADOOP_CONF_DIR=~/.hadoop-cloud/my-hadoop-cluster + +Let's try browsing HDFS: + +% hadoop fs -ls / + +Running a job is straightforward: + +% hadoop fs -mkdir input # create an input directory +% hadoop fs -put $HADOOP_HOME/LICENSE.txt input # copy a file there +% hadoop jar $HADOOP_HOME/hadoop-*-examples.jar wordcount input output +% hadoop fs -cat output/part-00000 | head + +Of course, these examples assume that you have installed Hadoop on your local +machine. It is also possible to launch jobs from within the cluster. First log +into the master node: + +% hadoop-ec2 login my-hadoop-cluster + +Then run a job as before: + +# hadoop fs -mkdir input +# hadoop fs -put /etc/hadoop/conf/*.xml input +# hadoop jar /usr/lib/hadoop/hadoop-*-examples.jar grep input output 'dfs[a-z.]+' +# hadoop fs -cat output/part-00000 | head + +TERMINATING A CLUSTER +===================== + +When you've finished with your cluster you can stop it with the following +command. + +NOTE: ALL DATA WILL BE LOST UNLESS YOU ARE USING EBS! + +% hadoop-ec2 terminate-cluster my-hadoop-cluster + +You can then delete the EC2 security groups with: + +% hadoop-ec2 delete-cluster my-hadoop-cluster + +AUTOMATIC CLUSTER SHUTDOWN +========================== + +You may use the --auto-shutdown option to automatically terminate a cluster +a given time (specified in minutes) after launch. This is useful for short-lived +clusters where the jobs complete in a known amount of time. + +If you want to cancel the automatic shutdown, then run + +% hadoop-ec2 exec my-hadoop-cluster shutdown -c +% hadoop-ec2 update-slaves-file my-hadoop-cluster +% hadoop-ec2 exec my-hadoop-cluster /usr/lib/hadoop/bin/slaves.sh shutdown -c + +CONFIGURATION NOTES +=================== + +It is possible to specify options on the command line: these take precedence +over any specified in the configuration file. For example: + +% hadoop-ec2 launch-cluster --image-id ami-2359bf4a --instance-type c1.xlarge \ + my-hadoop-cluster 10 + +This command launches a 10-node cluster using the specified image and instance +type, overriding the equivalent settings (if any) that are in the +"my-hadoop-cluster" section of the configuration file. Note that words in +options are separated by hyphens (--instance-type) while the corresponding +configuration parameter is are separated by underscores (instance_type). + +The scripts install Hadoop RPMs or Debian packages (depending on the OS) at +instance boot time. + +By default, Apache Hadoop 0.20.1 is installed. You can also run other versions +of Apache Hadoop. For example the following uses version 0.18.3: + +% hadoop-ec2 launch-cluster --env HADOOP_VERSION=0.18.3 \ + my-hadoop-cluster 10 + +CUSTOMIZATION +============= + +You can specify a list of packages to install on every instance at boot time +using the --user-packages command-line option (or the user_packages +configuration parameter). Packages should be space-separated. Note that package +names should reflect the package manager being used to install them (yum or +apt-get depending on the OS). + +Here's an example that installs RPMs for R and git: + +% hadoop-ec2 launch-cluster --user-packages 'R git-core' my-hadoop-cluster 10 + +You have full control over the script that is run when each instance boots. The +default script, hadoop-ec2-init-remote.sh, may be used as a starting point to +add extra configuration or customization of the instance. Make a copy of the +script in your home directory, or somewhere similar, and set the +--user-data-file command-line option (or the user_data_file configuration +parameter) to point to the (modified) copy. hadoop-ec2 will replace "%ENV%" +in your user data script with +USER_PACKAGES, AUTO_SHUTDOWN, and EBS_MAPPINGS, as well as extra parameters +supplied using the --env commandline flag. + +Another way of customizing the instance, which may be more appropriate for +larger changes, is to create you own image. + +It's possible to use any image, as long as it i) runs (gzip compressed) user +data on boot, and ii) has Java installed. + diff --git a/src/contrib/cloud/src/integration-test/create-ebs-snapshot.sh b/src/contrib/cloud/src/integration-test/create-ebs-snapshot.sh new file mode 100644 index 0000000000..971bde928f --- /dev/null +++ b/src/contrib/cloud/src/integration-test/create-ebs-snapshot.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This script tests the "hadoop-ec2 create-formatted-snapshot" command. +# The snapshot is deleted immediately afterwards. +# +# Example usage: +# ./create-ebs-snapshot.sh +# + +set -e +set -x + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +WORKSPACE=${WORKSPACE:-`pwd`} +CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud} +CLUSTER=${CLUSTER:-hadoop-cloud-$USER-test-cluster} +AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c} +KEY_NAME=${KEY_NAME:-$USER} +HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py} +HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2} +SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \ + -o StrictHostKeyChecking=no"} + +HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER + +$HADOOP_CLOUD_SCRIPT create-formatted-snapshot --config-dir=$CONFIG_DIR \ + --key-name=$KEY_NAME --availability-zone=$AVAILABILITY_ZONE \ + --ssh-options="$SSH_OPTIONS" \ + $CLUSTER 1 > out.tmp + +snapshot_id=`grep 'Created snapshot' out.tmp | awk '{print $3}'` + +ec2-delete-snapshot $snapshot_id + +rm -f out.tmp diff --git a/src/contrib/cloud/src/integration-test/ebs-storage-spec.json b/src/contrib/cloud/src/integration-test/ebs-storage-spec.json new file mode 100644 index 0000000000..28a17a2ad0 --- /dev/null +++ b/src/contrib/cloud/src/integration-test/ebs-storage-spec.json @@ -0,0 +1,30 @@ +{ + "master": [ + { + "device": "/dev/sdj", + "mount_point": "/ebs1", + "size_gb": "7", + "snapshot_id": "snap-fe44bb97" + }, + { + "device": "/dev/sdk", + "mount_point": "/ebs2", + "size_gb": "7", + "snapshot_id": "snap-fe44bb97" + } + ], + "slave": [ + { + "device": "/dev/sdj", + "mount_point": "/ebs1", + "size_gb": "7", + "snapshot_id": "snap-fe44bb97" + }, + { + "device": "/dev/sdk", + "mount_point": "/ebs2", + "size_gb": "7", + "snapshot_id": "snap-fe44bb97" + } + ] +} \ No newline at end of file diff --git a/src/contrib/cloud/src/integration-test/persistent-cluster.sh b/src/contrib/cloud/src/integration-test/persistent-cluster.sh new file mode 100644 index 0000000000..52daf146b7 --- /dev/null +++ b/src/contrib/cloud/src/integration-test/persistent-cluster.sh @@ -0,0 +1,122 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This script tests the Hadoop cloud scripts by running through a minimal +# sequence of steps to start a persistent (EBS) cluster, run a job, then +# shutdown the cluster. +# +# Example usage: +# HADOOP_HOME=~/dev/hadoop-0.20.1/ ./persistent-cluster.sh +# + +function wait_for_volume_detachment() { + set +e + set +x + while true; do + attached=`$HADOOP_CLOUD_SCRIPT list-storage --config-dir=$CONFIG_DIR \ + $CLUSTER | awk '{print $6}' | grep 'attached'` + sleep 5 + if [ -z "$attached" ]; then + break + fi + done + set -e + set -x +} + +set -e +set -x + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +WORKSPACE=${WORKSPACE:-`pwd`} +CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud} +CLUSTER=${CLUSTER:-hadoop-cloud-ebs-$USER-test-cluster} +IMAGE_ID=${IMAGE_ID:-ami-6159bf08} # default to Fedora 32-bit AMI +AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c} +KEY_NAME=${KEY_NAME:-$USER} +AUTO_SHUTDOWN=${AUTO_SHUTDOWN:-15} +LOCAL_HADOOP_VERSION=${LOCAL_HADOOP_VERSION:-0.20.1} +HADOOP_HOME=${HADOOP_HOME:-$WORKSPACE/hadoop-$LOCAL_HADOOP_VERSION} +HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py} +HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2} +SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \ + -o StrictHostKeyChecking=no"} + +HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER +export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER + +# Install Hadoop locally +if [ ! -d $HADOOP_HOME ]; then + wget http://archive.apache.org/dist/hadoop/core/hadoop-\ +$LOCAL_HADOOP_VERSION/hadoop-$LOCAL_HADOOP_VERSION.tar.gz + tar zxf hadoop-$LOCAL_HADOOP_VERSION.tar.gz -C $WORKSPACE + rm hadoop-$LOCAL_HADOOP_VERSION.tar.gz +fi + +# Create storage +$HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \ + --availability-zone=$AVAILABILITY_ZONE $CLUSTER master 1 \ + $bin/ebs-storage-spec.json +$HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \ + --availability-zone=$AVAILABILITY_ZONE $CLUSTER slave 1 \ + $bin/ebs-storage-spec.json + +# Launch a cluster +$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \ + --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \ + --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1 + +# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID +eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \ + --ssh-options="$SSH_OPTIONS" $CLUSTER` + +# Run a job and check it works +$HADOOP_HOME/bin/hadoop fs -mkdir input +$HADOOP_HOME/bin/hadoop fs -put $HADOOP_HOME/LICENSE.txt input +$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-*-examples.jar grep \ + input output Apache +# following returns a non-zero exit code if no match +$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache + +# Shutdown the cluster +kill $HADOOP_CLOUD_PROXY_PID +$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER +sleep 5 # wait for termination to take effect + +# Relaunch the cluster +$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \ + --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \ + --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1 + +# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID +eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \ + --ssh-options="$SSH_OPTIONS" $CLUSTER` + +# Check output is still there +$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache + +# Shutdown the cluster +kill $HADOOP_CLOUD_PROXY_PID +$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER +sleep 5 # wait for termination to take effect + +# Cleanup +$HADOOP_CLOUD_SCRIPT delete-cluster --config-dir=$CONFIG_DIR $CLUSTER +wait_for_volume_detachment +$HADOOP_CLOUD_SCRIPT delete-storage --config-dir=$CONFIG_DIR --force $CLUSTER diff --git a/src/contrib/cloud/src/integration-test/transient-cluster.sh b/src/contrib/cloud/src/integration-test/transient-cluster.sh new file mode 100644 index 0000000000..ca9e7bc025 --- /dev/null +++ b/src/contrib/cloud/src/integration-test/transient-cluster.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This script tests the Hadoop cloud scripts by running through a minimal +# sequence of steps to start a cluster, run a job, then shutdown the cluster. +# +# Example usage: +# HADOOP_HOME=~/dev/hadoop-0.20.1/ ./transient-cluster.sh +# + +set -e +set -x + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +WORKSPACE=${WORKSPACE:-`pwd`} +CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud} +CLUSTER=${CLUSTER:-hadoop-cloud-$USER-test-cluster} +IMAGE_ID=${IMAGE_ID:-ami-6159bf08} # default to Fedora 32-bit AMI +AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c} +KEY_NAME=${KEY_NAME:-$USER} +AUTO_SHUTDOWN=${AUTO_SHUTDOWN:-15} +LOCAL_HADOOP_VERSION=${LOCAL_HADOOP_VERSION:-0.20.1} +HADOOP_HOME=${HADOOP_HOME:-$WORKSPACE/hadoop-$LOCAL_HADOOP_VERSION} +HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py} +HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2} +SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \ + -o StrictHostKeyChecking=no"} + +HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER +export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER + +# Install Hadoop locally +if [ ! -d $HADOOP_HOME ]; then + wget http://archive.apache.org/dist/hadoop/core/hadoop-\ +$LOCAL_HADOOP_VERSION/hadoop-$LOCAL_HADOOP_VERSION.tar.gz + tar zxf hadoop-$LOCAL_HADOOP_VERSION.tar.gz -C $WORKSPACE + rm hadoop-$LOCAL_HADOOP_VERSION.tar.gz +fi + +# Launch a cluster +$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \ + --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \ + --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1 + +# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID +eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \ + --ssh-options="$SSH_OPTIONS" $CLUSTER` + +# Run a job and check it works +$HADOOP_HOME/bin/hadoop fs -mkdir input +$HADOOP_HOME/bin/hadoop fs -put $HADOOP_HOME/LICENSE.txt input +$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-*-examples.jar grep \ + input output Apache +# following returns a non-zero exit code if no match +$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache + +# Shutdown the cluster +kill $HADOOP_CLOUD_PROXY_PID +$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER +sleep 5 # wait for termination to take effect +$HADOOP_CLOUD_SCRIPT delete-cluster --config-dir=$CONFIG_DIR $CLUSTER diff --git a/src/contrib/cloud/src/py/VERSION b/src/contrib/cloud/src/py/VERSION new file mode 100644 index 0000000000..a7f3fc27a7 --- /dev/null +++ b/src/contrib/cloud/src/py/VERSION @@ -0,0 +1 @@ +0.22.0 \ No newline at end of file diff --git a/src/contrib/cloud/src/py/hadoop-ec2 b/src/contrib/cloud/src/py/hadoop-ec2 new file mode 100644 index 0000000000..ef0c17295a --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop-ec2 @@ -0,0 +1,21 @@ +#!/usr/bin/env python2.5 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hadoop.cloud.cli import main + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh b/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh new file mode 100644 index 0000000000..fee8f1d66c --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh @@ -0,0 +1,530 @@ +#!/bin/bash -x + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################################################################ +# Script that is run on each EC2 instance on boot. It is passed in the EC2 user +# data, so should not exceed 16K in size after gzip compression. +# +# This script is executed by /etc/init.d/ec2-run-user-data, and output is +# logged to /var/log/messages. +################################################################################ + +################################################################################ +# Initialize variables +################################################################################ + +# Substitute environment variables passed by the client +export %ENV% + +if [ -z "$MASTER_HOST" ]; then + IS_MASTER=true + MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname` +else + IS_MASTER=false +fi + +HADOOP_VERSION=${HADOOP_VERSION:-0.20.1} +HADOOP_HOME=/usr/local/hadoop-$HADOOP_VERSION +HADOOP_CONF_DIR=$HADOOP_HOME/conf + +function register_auto_shutdown() { + if [ ! -z "$AUTO_SHUTDOWN" ]; then + shutdown -h +$AUTO_SHUTDOWN >/dev/null & + fi +} + +# Install a list of packages on debian or redhat as appropriate +function install_packages() { + if which dpkg &> /dev/null; then + apt-get update + apt-get -y install $@ + elif which rpm &> /dev/null; then + yum install -y $@ + else + echo "No package manager found." + fi +} + +# Install any user packages specified in the USER_PACKAGES environment variable +function install_user_packages() { + if [ ! -z "$USER_PACKAGES" ]; then + install_packages $USER_PACKAGES + fi +} + +function install_hadoop() { + useradd hadoop + + hadoop_tar_url=http://s3.amazonaws.com/hadoop-releases/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz + hadoop_tar_file=`basename $hadoop_tar_url` + hadoop_tar_md5_file=`basename $hadoop_tar_url.md5` + + curl="curl --retry 3 --silent --show-error --fail" + for i in `seq 1 3`; + do + $curl -O $hadoop_tar_url + $curl -O $hadoop_tar_url.md5 + if md5sum -c $hadoop_tar_md5_file; then + break; + else + rm -f $hadoop_tar_file $hadoop_tar_md5_file + fi + done + + if [ ! -e $hadoop_tar_file ]; then + echo "Failed to download $hadoop_tar_url. Aborting." + exit 1 + fi + + tar zxf $hadoop_tar_file -C /usr/local + rm -f $hadoop_tar_file $hadoop_tar_md5_file + + echo "export HADOOP_HOME=$HADOOP_HOME" >> ~root/.bashrc + echo 'export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH' >> ~root/.bashrc +} + +function prep_disk() { + mount=$1 + device=$2 + automount=${3:-false} + + echo "warning: ERASING CONTENTS OF $device" + mkfs.xfs -f $device + if [ ! -e $mount ]; then + mkdir $mount + fi + mount -o defaults,noatime $device $mount + if $automount ; then + echo "$device $mount xfs defaults,noatime 0 0" >> /etc/fstab + fi +} + +function wait_for_mount { + mount=$1 + device=$2 + + mkdir $mount + + i=1 + echo "Attempting to mount $device" + while true ; do + sleep 10 + echo -n "$i " + i=$[$i+1] + mount -o defaults,noatime $device $mount || continue + echo " Mounted." + break; + done +} + +function make_hadoop_dirs { + for mount in "$@"; do + if [ ! -e $mount/hadoop ]; then + mkdir -p $mount/hadoop + chown hadoop:hadoop $mount/hadoop + fi + done +} + +# Configure Hadoop by setting up disks and site file +function configure_hadoop() { + + install_packages xfsprogs # needed for XFS + + INSTANCE_TYPE=`wget -q -O - http://169.254.169.254/latest/meta-data/instance-type` + + if [ -n "$EBS_MAPPINGS" ]; then + # EBS_MAPPINGS is like "/ebs1,/dev/sdj;/ebs2,/dev/sdk" + DFS_NAME_DIR='' + FS_CHECKPOINT_DIR='' + DFS_DATA_DIR='' + for mapping in $(echo "$EBS_MAPPINGS" | tr ";" "\n"); do + # Split on the comma (see "Parameter Expansion" in the bash man page) + mount=${mapping%,*} + device=${mapping#*,} + wait_for_mount $mount $device + DFS_NAME_DIR=${DFS_NAME_DIR},"$mount/hadoop/hdfs/name" + FS_CHECKPOINT_DIR=${FS_CHECKPOINT_DIR},"$mount/hadoop/hdfs/secondary" + DFS_DATA_DIR=${DFS_DATA_DIR},"$mount/hadoop/hdfs/data" + FIRST_MOUNT=${FIRST_MOUNT-$mount} + make_hadoop_dirs $mount + done + # Remove leading commas + DFS_NAME_DIR=${DFS_NAME_DIR#?} + FS_CHECKPOINT_DIR=${FS_CHECKPOINT_DIR#?} + DFS_DATA_DIR=${DFS_DATA_DIR#?} + + DFS_REPLICATION=3 # EBS is internally replicated, but we also use HDFS replication for safety + else + case $INSTANCE_TYPE in + m1.xlarge|c1.xlarge) + DFS_NAME_DIR=/mnt/hadoop/hdfs/name,/mnt2/hadoop/hdfs/name + FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary,/mnt2/hadoop/hdfs/secondary + DFS_DATA_DIR=/mnt/hadoop/hdfs/data,/mnt2/hadoop/hdfs/data,/mnt3/hadoop/hdfs/data,/mnt4/hadoop/hdfs/data + ;; + m1.large) + DFS_NAME_DIR=/mnt/hadoop/hdfs/name,/mnt2/hadoop/hdfs/name + FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary,/mnt2/hadoop/hdfs/secondary + DFS_DATA_DIR=/mnt/hadoop/hdfs/data,/mnt2/hadoop/hdfs/data + ;; + *) + # "m1.small" or "c1.medium" + DFS_NAME_DIR=/mnt/hadoop/hdfs/name + FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary + DFS_DATA_DIR=/mnt/hadoop/hdfs/data + ;; + esac + FIRST_MOUNT=/mnt + DFS_REPLICATION=3 + fi + + case $INSTANCE_TYPE in + m1.xlarge|c1.xlarge) + prep_disk /mnt2 /dev/sdc true & + disk2_pid=$! + prep_disk /mnt3 /dev/sdd true & + disk3_pid=$! + prep_disk /mnt4 /dev/sde true & + disk4_pid=$! + wait $disk2_pid $disk3_pid $disk4_pid + MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local,/mnt2/hadoop/mapred/local,/mnt3/hadoop/mapred/local,/mnt4/hadoop/mapred/local + MAX_MAP_TASKS=8 + MAX_REDUCE_TASKS=4 + CHILD_OPTS=-Xmx680m + CHILD_ULIMIT=1392640 + ;; + m1.large) + prep_disk /mnt2 /dev/sdc true + MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local,/mnt2/hadoop/mapred/local + MAX_MAP_TASKS=4 + MAX_REDUCE_TASKS=2 + CHILD_OPTS=-Xmx1024m + CHILD_ULIMIT=2097152 + ;; + c1.medium) + MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local + MAX_MAP_TASKS=4 + MAX_REDUCE_TASKS=2 + CHILD_OPTS=-Xmx550m + CHILD_ULIMIT=1126400 + ;; + *) + # "m1.small" + MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local + MAX_MAP_TASKS=2 + MAX_REDUCE_TASKS=1 + CHILD_OPTS=-Xmx550m + CHILD_ULIMIT=1126400 + ;; + esac + + make_hadoop_dirs `ls -d /mnt*` + + # Create tmp directory + mkdir /mnt/tmp + chmod a+rwxt /mnt/tmp + + ############################################################################## + # Modify this section to customize your Hadoop cluster. + ############################################################################## + cat > $HADOOP_CONF_DIR/hadoop-site.xml < + + + + dfs.block.size + 134217728 + true + + + dfs.data.dir + $DFS_DATA_DIR + true + + + dfs.datanode.du.reserved + 1073741824 + true + + + dfs.datanode.handler.count + 3 + true + + + + + dfs.name.dir + $DFS_NAME_DIR + true + + + dfs.namenode.handler.count + 5 + true + + + dfs.permissions + true + true + + + dfs.replication + $DFS_REPLICATION + + + fs.checkpoint.dir + $FS_CHECKPOINT_DIR + true + + + fs.default.name + hdfs://$MASTER_HOST:8020/ + + + fs.trash.interval + 1440 + true + + + hadoop.tmp.dir + /mnt/tmp/hadoop-\${user.name} + true + + + io.file.buffer.size + 65536 + + + mapred.child.java.opts + $CHILD_OPTS + + + mapred.child.ulimit + $CHILD_ULIMIT + true + + + mapred.job.tracker + $MASTER_HOST:8021 + + + mapred.job.tracker.handler.count + 5 + true + + + mapred.local.dir + $MAPRED_LOCAL_DIR + true + + + mapred.map.tasks.speculative.execution + true + + + mapred.reduce.parallel.copies + 10 + + + mapred.reduce.tasks + 10 + + + mapred.reduce.tasks.speculative.execution + false + + + mapred.submit.replication + 10 + + + mapred.system.dir + /hadoop/system/mapred + + + mapred.tasktracker.map.tasks.maximum + $MAX_MAP_TASKS + true + + + mapred.tasktracker.reduce.tasks.maximum + $MAX_REDUCE_TASKS + true + + + tasktracker.http.threads + 46 + true + + + mapred.compress.map.output + true + + + mapred.output.compression.type + BLOCK + + + hadoop.rpc.socket.factory.class.default + org.apache.hadoop.net.StandardSocketFactory + true + + + hadoop.rpc.socket.factory.class.ClientProtocol + + true + + + hadoop.rpc.socket.factory.class.JobSubmissionProtocol + + true + + + io.compression.codecs + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec + + + fs.s3.awsAccessKeyId + $AWS_ACCESS_KEY_ID + + + fs.s3.awsSecretAccessKey + $AWS_SECRET_ACCESS_KEY + + + fs.s3n.awsAccessKeyId + $AWS_ACCESS_KEY_ID + + + fs.s3n.awsSecretAccessKey + $AWS_SECRET_ACCESS_KEY + + +EOF + + # Keep PID files in a non-temporary directory + sed -i -e "s|# export HADOOP_PID_DIR=.*|export HADOOP_PID_DIR=/var/run/hadoop|" \ + $HADOOP_CONF_DIR/hadoop-env.sh + mkdir -p /var/run/hadoop + chown -R hadoop:hadoop /var/run/hadoop + + # Set SSH options within the cluster + sed -i -e 's|# export HADOOP_SSH_OPTS=.*|export HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no"|' \ + $HADOOP_CONF_DIR/hadoop-env.sh + + # Hadoop logs should be on the /mnt partition + sed -i -e 's|# export HADOOP_LOG_DIR=.*|export HADOOP_LOG_DIR=/var/log/hadoop/logs|' \ + $HADOOP_CONF_DIR/hadoop-env.sh + rm -rf /var/log/hadoop + mkdir /mnt/hadoop/logs + chown hadoop:hadoop /mnt/hadoop/logs + ln -s /mnt/hadoop/logs /var/log/hadoop + chown -R hadoop:hadoop /var/log/hadoop + +} + +# Sets up small website on cluster. +function setup_web() { + + if which dpkg &> /dev/null; then + apt-get -y install thttpd + WWW_BASE=/var/www + elif which rpm &> /dev/null; then + yum install -y thttpd + chkconfig --add thttpd + WWW_BASE=/var/www/thttpd/html + fi + + cat > $WWW_BASE/index.html << END + + +Hadoop EC2 Cluster + + +

Hadoop EC2 Cluster

+To browse the cluster you need to have a proxy configured. +Start the proxy with hadoop-ec2 proxy <cluster_name>, +and point your browser to +this Proxy +Auto-Configuration (PAC) file. To manage multiple proxy configurations, +you may wish to use +FoxyProxy. + + + +END + + service thttpd start + +} + +function start_hadoop_master() { + if which dpkg &> /dev/null; then + AS_HADOOP="su -s /bin/bash - hadoop -c" + elif which rpm &> /dev/null; then + AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c" + fi + + # Format HDFS + [ ! -e $FIRST_MOUNT/hadoop/hdfs ] && $AS_HADOOP "$HADOOP_HOME/bin/hadoop namenode -format" + + $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start namenode" + $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start secondarynamenode" + $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start jobtracker" + + $AS_HADOOP "$HADOOP_HOME/bin/hadoop dfsadmin -safemode wait" + $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -mkdir /user" + # The following is questionable, as it allows a user to delete another user + # It's needed to allow users to create their own user directories + $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -chmod +w /user" + +} + +function start_hadoop_slave() { + if which dpkg &> /dev/null; then + AS_HADOOP="su -s /bin/bash - hadoop -c" + elif which rpm &> /dev/null; then + AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c" + fi + + $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start datanode" + $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker" +} + +register_auto_shutdown +install_user_packages +install_hadoop +configure_hadoop + +if $IS_MASTER ; then + setup_web + start_hadoop_master +else + start_hadoop_slave +fi diff --git a/src/contrib/cloud/src/py/hadoop/__init__.py b/src/contrib/cloud/src/py/hadoop/__init__.py new file mode 100644 index 0000000000..13878a13a7 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/contrib/cloud/src/py/hadoop/cloud/__init__.py b/src/contrib/cloud/src/py/hadoop/cloud/__init__.py new file mode 100644 index 0000000000..13878a13a7 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/contrib/cloud/src/py/hadoop/cloud/cli.py b/src/contrib/cloud/src/py/hadoop/cloud/cli.py new file mode 100644 index 0000000000..675cffcc4c --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/cli.py @@ -0,0 +1,456 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import with_statement + +import ConfigParser +import hadoop.cloud.commands as commands +from hadoop.cloud.cluster import get_cluster +from hadoop.cloud.cluster import TimeoutException +from hadoop.cloud.providers.ec2 import Ec2Storage +from hadoop.cloud.util import merge_config_with_options +from hadoop.cloud.util import xstr +import logging +from optparse import OptionParser +from optparse import make_option +import os +import subprocess +import sys + +version_file = os.path.join(sys.path[0], "VERSION") +VERSION = open(version_file, "r").read().strip() + +DEFAULT_CLOUD_PROVIDER = 'ec2' + +DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud' +DEFAULT_CONFIG_DIR = os.path.join(os.environ['HOME'], DEFAULT_CONFIG_DIR_NAME) +CONFIG_FILENAME = 'clusters.cfg' + +CONFIG_DIR_OPTION = \ + make_option("--config-dir", metavar="CONFIG-DIR", + help="The configuration directory.") + +PROVIDER_OPTION = \ + make_option("--cloud-provider", metavar="PROVIDER", + help="The cloud provider, e.g. 'ec2' for Amazon EC2.") + +BASIC_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, +] + +LAUNCH_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, + make_option("-a", "--ami", metavar="AMI", + help="The AMI ID of the image to launch. (Amazon EC2 only. Deprecated, use \ +--image-id.)"), + make_option("-e", "--env", metavar="ENV", action="append", + help="An environment variable to pass to instances. \ +(May be specified multiple times.)"), + make_option("-f", "--user-data-file", metavar="URL", + help="The URL of the file containing user data to be made available to \ +instances."), + make_option("--image-id", metavar="ID", + help="The ID of the image to launch."), + make_option("-k", "--key-name", metavar="KEY-PAIR", + help="The key pair to use when launching instances. (Amazon EC2 only.)"), + make_option("-p", "--user-packages", metavar="PACKAGES", + help="A space-separated list of packages to install on instances on start \ +up."), + make_option("-t", "--instance-type", metavar="TYPE", + help="The type of instance to be launched. One of m1.small, m1.large, \ +m1.xlarge, c1.medium, or c1.xlarge."), + make_option("-z", "--availability-zone", metavar="ZONE", + help="The availability zone to run the instances in."), + make_option("--auto-shutdown", metavar="TIMEOUT_MINUTES", + help="The time in minutes after launch when an instance will be \ +automatically shut down."), + make_option("--client-cidr", metavar="CIDR", action="append", + help="The CIDR of the client, which is used to allow access through the \ +firewall to the master node. (May be specified multiple times.)"), + make_option("--public-key", metavar="FILE", + help="The public key to authorize on launching instances. (Non-EC2 \ +providers only.)"), +] + +SNAPSHOT_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, + make_option("-k", "--key-name", metavar="KEY-PAIR", + help="The key pair to use when launching instances."), + make_option("-z", "--availability-zone", metavar="ZONE", + help="The availability zone to run the instances in."), + make_option("--ssh-options", metavar="SSH-OPTIONS", + help="SSH options to use."), +] + +PLACEMENT_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, + make_option("-z", "--availability-zone", metavar="ZONE", + help="The availability zone to run the instances in."), +] + +FORCE_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, + make_option("--force", action="store_true", default=False, + help="Do not ask for confirmation."), +] + +SSH_OPTIONS = [ + CONFIG_DIR_OPTION, + PROVIDER_OPTION, + make_option("--ssh-options", metavar="SSH-OPTIONS", + help="SSH options to use."), +] + +def print_usage(script): + print """Usage: %(script)s COMMAND [OPTIONS] +where COMMAND and [OPTIONS] may be one of: + list [CLUSTER] list all running Hadoop clusters + or instances in CLUSTER + launch-master CLUSTER launch or find a master in CLUSTER + launch-slaves CLUSTER NUM_SLAVES launch NUM_SLAVES slaves in CLUSTER + launch-cluster CLUSTER NUM_SLAVES launch a master and NUM_SLAVES slaves + in CLUSTER + create-formatted-snapshot CLUSTER create an empty, formatted snapshot of + SIZE size SIZE GiB + list-storage CLUSTER list storage volumes for CLUSTER + create-storage CLUSTER ROLE create volumes for NUM_INSTANCES instances + NUM_INSTANCES SPEC_FILE in ROLE for CLUSTER, using SPEC_FILE + attach-storage ROLE attach storage volumes for ROLE to CLUSTER + login CLUSTER log in to the master in CLUSTER over SSH + proxy CLUSTER start a SOCKS proxy on localhost into the + CLUSTER + push CLUSTER FILE scp FILE to the master in CLUSTER + exec CLUSTER CMD execute CMD on the master in CLUSTER + terminate-cluster CLUSTER terminate all instances in CLUSTER + delete-cluster CLUSTER delete the group information for CLUSTER + delete-storage CLUSTER delete all storage volumes for CLUSTER + update-slaves-file CLUSTER update the slaves file on the CLUSTER + master + +Use %(script)s COMMAND --help to see additional options for specific commands. +""" % locals() + +def parse_options_and_config(command, option_list=[], extra_arguments=(), + unbounded_args=False): + """ + Parse the arguments to command using the given option list, and combine with + any configuration parameters. + + If unbounded_args is true then there must be at least as many extra arguments + as specified by extra_arguments (the first argument is always CLUSTER). + Otherwise there must be exactly the same number of arguments as + extra_arguments. + """ + expected_arguments = ["CLUSTER",] + expected_arguments.extend(extra_arguments) + (options_dict, args) = parse_options(command, option_list, expected_arguments, + unbounded_args) + + config_dir = get_config_dir(options_dict) + config_files = [os.path.join(config_dir, CONFIG_FILENAME)] + if 'config_dir' not in options_dict: + # if config_dir not set, then also search in current directory + config_files.insert(0, CONFIG_FILENAME) + + config = ConfigParser.ConfigParser() + read_files = config.read(config_files) + logging.debug("Read %d configuration files: %s", len(read_files), + ", ".join(read_files)) + cluster_name = args[0] + opt = merge_config_with_options(cluster_name, config, options_dict) + logging.debug("Options: %s", str(opt)) + return (opt, args, get_cluster(get_cloud_provider(opt))(cluster_name, + config_dir)) + +def parse_options(command, option_list=[], expected_arguments=(), + unbounded_args=False): + """ + Parse the arguments to command using the given option list. + + If unbounded_args is true then there must be at least as many extra arguments + as specified by extra_arguments (the first argument is always CLUSTER). + Otherwise there must be exactly the same number of arguments as + extra_arguments. + """ + + config_file_name = "%s/%s" % (DEFAULT_CONFIG_DIR_NAME, CONFIG_FILENAME) + usage = """%%prog %s [options] %s + +Options may also be specified in a configuration file called +%s located in the user's home directory. +Options specified on the command line take precedence over any in the +configuration file.""" % (command, " ".join(expected_arguments), + config_file_name) + parser = OptionParser(usage=usage, version="%%prog %s" % VERSION, + option_list=option_list) + parser.disable_interspersed_args() + (options, args) = parser.parse_args(sys.argv[2:]) + if unbounded_args: + if len(args) < len(expected_arguments): + parser.error("incorrect number of arguments") + elif len(args) != len(expected_arguments): + parser.error("incorrect number of arguments") + return (vars(options), args) + +def get_config_dir(options_dict): + config_dir = options_dict.get('config_dir') + if not config_dir: + config_dir = DEFAULT_CONFIG_DIR + return config_dir + +def get_cloud_provider(options_dict): + provider = options_dict.get("cloud_provider", None) + if provider is None: + provider = DEFAULT_CLOUD_PROVIDER + return provider + +def check_options_set(options, option_names): + for option_name in option_names: + if options.get(option_name) is None: + print "Option '%s' is missing. Aborting." % option_name + sys.exit(1) + +def check_launch_options_set(cluster, options): + if cluster.get_provider_code() == 'ec2': + if options.get('ami') is None and options.get('image_id') is None: + print "One of ami or image_id must be specified. Aborting." + sys.exit(1) + check_options_set(options, ['key_name']) + else: + check_options_set(options, ['image_id', 'public_key']) + +def get_image_id(cluster, options): + if cluster.get_provider_code() == 'ec2': + return options.get('image_id', options.get('ami')) + else: + return options.get('image_id') + +def _prompt(prompt): + """ Returns true if user responds "yes" to prompt. """ + return raw_input("%s [yes or no]: " % prompt).lower() == "yes" + +def main(): + # Use HADOOP_CLOUD_LOGGING_LEVEL=DEBUG to enable debugging output. + logging.basicConfig(level=getattr(logging, + os.getenv("HADOOP_CLOUD_LOGGING_LEVEL", + "INFO"))) + + if len(sys.argv) < 2: + print_usage(sys.argv[0]) + sys.exit(1) + + command = sys.argv[1] + + if command == 'list': + (opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True) + if len(args) == 0: + commands.list_all(get_cloud_provider(opt)) + else: + (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS) + commands.list_cluster(cluster) + + elif command == 'launch-master': + (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS) + check_launch_options_set(cluster, opt) + config_dir = get_config_dir(opt) + commands.launch_master(cluster, config_dir, get_image_id(cluster, opt), + opt.get('instance_type'), + opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'), + opt.get('availability_zone'), opt.get('user_packages'), + opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr')) + commands.attach_storage(cluster, (commands.MASTER,)) + try: + commands.wait_for_hadoop(cluster, 0) + except TimeoutException: + print "Timeout while waiting for Hadoop to start. Please check logs on" +\ + " master." + commands.print_master_url(cluster) + + elif command == 'launch-slaves': + (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS, + ("NUM_SLAVES",)) + number_of_slaves = int(args[1]) + check_launch_options_set(cluster, opt) + config_dir = get_config_dir(opt) + commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt), + opt.get('instance_type'), + opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'), + opt.get('availability_zone'), opt.get('user_packages'), + opt.get('auto_shutdown'), opt.get('env')) + commands.attach_storage(cluster, (commands.SLAVE,)) + commands.print_master_url(cluster) + + elif command == 'launch-cluster': + (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS, + ("NUM_SLAVES",)) + number_of_slaves = int(args[1]) + check_launch_options_set(cluster, opt) + config_dir = get_config_dir(opt) + commands.launch_master(cluster, config_dir, get_image_id(cluster, opt), + opt.get('instance_type'), + opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'), + opt.get('availability_zone'), opt.get('user_packages'), + opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr')) + commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt), + opt.get('instance_type'), + opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'), + opt.get('availability_zone'), opt.get('user_packages'), + opt.get('auto_shutdown'), opt.get('env')) + commands.attach_storage(cluster, commands.ROLES) + try: + commands.wait_for_hadoop(cluster, number_of_slaves) + except TimeoutException: + print "Timeout while waiting for Hadoop to start. Please check logs on" +\ + " cluster." + commands.print_master_url(cluster) + + elif command == 'login': + (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS) + instances = cluster.check_running(commands.MASTER, 1) + if not instances: + sys.exit(1) + subprocess.call('ssh %s root@%s' % \ + (xstr(opt.get('ssh_options')), instances[0].public_ip), + shell=True) + + elif command == 'proxy': + (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS) + instances = cluster.check_running(commands.MASTER, 1) + if not instances: + sys.exit(1) + options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \ + '-N -D 6666' + process = subprocess.Popen('ssh %s %s root@%s' % + (xstr(opt.get('ssh_options')), options, instances[0].public_ip), + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=True) + print """export HADOOP_CLOUD_PROXY_PID=%s; +echo Proxy pid %s;""" % (process.pid, process.pid) + + elif command == 'push': + (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS, + ("FILE",)) + instances = cluster.check_running(commands.MASTER, 1) + if not instances: + sys.exit(1) + subprocess.call('scp %s -r %s root@%s:' % (xstr(opt.get('ssh_options')), + args[1], instances[0].public_ip), + shell=True) + + elif command == 'exec': + (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS, + ("CMD",), True) + instances = cluster.check_running(commands.MASTER, 1) + if not instances: + sys.exit(1) + subprocess.call("ssh %s root@%s '%s'" % (xstr(opt.get('ssh_options')), + instances[0].public_ip, + " ".join(args[1:])), shell=True) + + elif command == 'terminate-cluster': + (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS) + cluster.print_status(commands.ROLES) + if not opt["force"] and not _prompt("Terminate all instances?"): + print "Not terminating cluster." + else: + print "Terminating cluster" + cluster.terminate() + + elif command == 'delete-cluster': + (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS) + cluster.delete() + + elif command == 'create-formatted-snapshot': + (opt, args, cluster) = parse_options_and_config(command, SNAPSHOT_OPTIONS, + ("SIZE",)) + size = int(args[1]) + check_options_set(opt, ['availability_zone', 'key_name']) + ami_ubuntu_intrepid_x86 = 'ami-ec48af85' # use a general AMI + Ec2Storage.create_formatted_snapshot(cluster, size, + opt.get('availability_zone'), + ami_ubuntu_intrepid_x86, + opt.get('key_name'), + xstr(opt.get('ssh_options'))) + + elif command == 'list-storage': + (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS) + storage = cluster.get_storage() + storage.print_status(commands.ROLES) + + elif command == 'create-storage': + (opt, args, cluster) = parse_options_and_config(command, PLACEMENT_OPTIONS, + ("ROLE", "NUM_INSTANCES", + "SPEC_FILE")) + storage = cluster.get_storage() + role = args[1] + number_of_instances = int(args[2]) + spec_file = args[3] + check_options_set(opt, ['availability_zone']) + storage.create(role, number_of_instances, opt.get('availability_zone'), + spec_file) + storage.print_status(commands.ROLES) + + elif command == 'attach-storage': + (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS, + ("ROLE",)) + storage = cluster.get_storage() + role = args[1] + storage.attach(role, cluster.get_instances_in_role(role, 'running')) + storage.print_status(commands.ROLES) + + elif command == 'delete-storage': + (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS) + storage = cluster.get_storage() + storage.print_status(commands.ROLES) + if not opt["force"] and not _prompt("Delete all storage volumes? THIS WILL \ + PERMANENTLY DELETE ALL DATA"): + print "Not deleting storage volumes." + else: + print "Deleting storage" + for role in commands.ROLES: + storage.delete(role) + + elif command == 'update-slaves-file': + (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS) + check_options_set(opt, ['private_key']) + ssh_options = xstr(opt.get('ssh_options')) + instances = cluster.check_running(commands.MASTER, 1) + if not instances: + sys.exit(1) + master = instances[0] + slaves = cluster.get_instances_in_role(commands.SLAVE) + with open('slaves', 'w') as f: + for slave in slaves: + f.write(slave.public_ip + "\n") + subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \ + (ssh_options, 'slaves', master.public_ip), shell=True) + + # Copy private key + private_key = opt.get('private_key') + subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \ + (ssh_options, private_key, master.public_ip), shell=True) + for slave in slaves: + subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \ + (ssh_options, private_key, slave.public_ip), shell=True) + + else: + print "Unrecognized command '%s'" % command + print_usage(sys.argv[0]) + sys.exit(1) diff --git a/src/contrib/cloud/src/py/hadoop/cloud/cluster.py b/src/contrib/cloud/src/py/hadoop/cloud/cluster.py new file mode 100644 index 0000000000..63e177511a --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/cluster.py @@ -0,0 +1,186 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Classes for controlling a cluster of cloud instances. +""" + +from __future__ import with_statement + +import gzip +import StringIO +import urllib + +from hadoop.cloud.storage import Storage + +CLUSTER_PROVIDER_MAP = { + "dummy": ('hadoop.cloud.providers.dummy', 'DummyCluster'), + "ec2": ('hadoop.cloud.providers.ec2', 'Ec2Cluster'), +} + +def get_cluster(provider): + """ + Retrieve the Cluster class for a provider. + """ + mod_name, driver_name = CLUSTER_PROVIDER_MAP[provider] + _mod = __import__(mod_name, globals(), locals(), [driver_name]) + return getattr(_mod, driver_name) + +class Cluster(object): + """ + A cluster of server instances. A cluster has a unique name. + One may launch instances which run in a certain role. + """ + + def __init__(self, name, config_dir): + self.name = name + self.config_dir = config_dir + + def get_provider_code(self): + """ + The code that uniquely identifies the cloud provider. + """ + raise Exception("Unimplemented") + + def authorize_role(self, role, from_port, to_port, cidr_ip): + """ + Authorize access to machines in a given role from a given network. + """ + pass + + def get_instances_in_role(self, role, state_filter=None): + """ + Get all the instances in a role, filtered by state. + + @param role: the name of the role + @param state_filter: the state that the instance should be in + (e.g. "running"), or None for all states + """ + raise Exception("Unimplemented") + + def print_status(self, roles, state_filter="running"): + """ + Print the status of instances in the given roles, filtered by state. + """ + pass + + def check_running(self, role, number): + """ + Check that a certain number of instances in a role are running. + """ + instances = self.get_instances_in_role(role, "running") + if len(instances) != number: + print "Expected %s instances in role %s, but was %s %s" % \ + (number, role, len(instances), instances) + return False + else: + return instances + + def launch_instances(self, role, number, image_id, size_id, + instance_user_data, **kwargs): + """ + Launch instances (of the given role) in the cluster. + Returns a list of IDs for the instances started. + """ + pass + + def wait_for_instances(self, instance_ids, timeout=600): + """ + Wait for instances to start. + Raise TimeoutException if the timeout is exceeded. + """ + pass + + def terminate(self): + """ + Terminate all instances in the cluster. + """ + pass + + def delete(self): + """ + Delete the cluster permanently. This operation is only permitted if no + instances are running. + """ + pass + + def get_storage(self): + """ + Return the external storage for the cluster. + """ + return Storage(self) + +class InstanceUserData(object): + """ + The data passed to an instance on start up. + """ + + def __init__(self, filename, replacements={}): + self.filename = filename + self.replacements = replacements + + def _read_file(self, filename): + """ + Read the user data. + """ + return urllib.urlopen(filename).read() + + def read(self): + """ + Read the user data, making replacements. + """ + contents = self._read_file(self.filename) + for (match, replacement) in self.replacements.iteritems(): + if replacement == None: + replacement = '' + contents = contents.replace(match, replacement) + return contents + + def read_as_gzip_stream(self): + """ + Read and compress the data. + """ + output = StringIO.StringIO() + compressed = gzip.GzipFile(mode='wb', fileobj=output) + compressed.write(self.read()) + compressed.close() + return output.getvalue() + +class Instance(object): + """ + A server instance. + """ + def __init__(self, id, public_ip, private_ip): + self.id = id + self.public_ip = public_ip + self.private_ip = private_ip + +class RoleSyntaxException(Exception): + """ + Raised when a role name is invalid. Role names may consist of a sequence + of alphanumeric characters and underscores. Dashes are not permitted in role + names. + """ + def __init__(self, message): + super(RoleSyntaxException, self).__init__() + self.message = message + def __str__(self): + return repr(self.message) + +class TimeoutException(Exception): + """ + Raised when a timeout is exceeded. + """ + pass diff --git a/src/contrib/cloud/src/py/hadoop/cloud/commands.py b/src/contrib/cloud/src/py/hadoop/cloud/commands.py new file mode 100644 index 0000000000..f0ad7f83f5 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/commands.py @@ -0,0 +1,261 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""High-level commands that a user may want to run""" + +from __future__ import with_statement + +from hadoop.cloud.cluster import get_cluster +from hadoop.cloud.cluster import InstanceUserData +from hadoop.cloud.cluster import TimeoutException +from hadoop.cloud.util import build_env_string +from hadoop.cloud.util import url_get +import logging +import os +import re +import socket +import sys +import time + +logger = logging.getLogger(__name__) + +MASTER = "master" +SLAVE = "slave" +# ROLES contains the set of all roles in a cluster. It may be expanded in the +# future to support HBase, or split namnode and jobtracker instances. +ROLES = (MASTER, SLAVE) + +def _get_default_user_data_file_template(cluster): + return os.path.join(sys.path[0], 'hadoop-%s-init-remote.sh' % + cluster.get_provider_code()) + +def list_all(provider): + """ + Find and print clusters that have a running 'master' instance + """ + clusters = get_cluster(provider).get_clusters_with_role(MASTER) + if not clusters: + print "No running clusters" + else: + for cluster in clusters: + print cluster + +def list_cluster(cluster): + cluster.print_status(ROLES) + +def launch_master(cluster, config_dir, image_id, size_id, key_name, public_key, + user_data_file_template=None, placement=None, + user_packages=None, auto_shutdown=None, env_strings=[], + client_cidrs=[]): + if user_data_file_template == None: + user_data_file_template = _get_default_user_data_file_template(cluster) + if cluster.check_running(MASTER, 0) == False: + return # don't proceed if another master is running + ebs_mappings = '' + storage = cluster.get_storage() + if storage.has_any_storage((MASTER,)): + ebs_mappings = storage.get_mappings_string_for_role(MASTER) + replacements = { "%ENV%": build_env_string(env_strings, { + "USER_PACKAGES": user_packages, + "AUTO_SHUTDOWN": auto_shutdown, + "EBS_MAPPINGS": ebs_mappings + }) } + instance_user_data = InstanceUserData(user_data_file_template, replacements) + instance_ids = cluster.launch_instances(MASTER, 1, image_id, size_id, + instance_user_data, + key_name=key_name, + public_key=public_key, + placement=placement) + print "Waiting for master to start (%s)" % str(instance_ids[0]) + try: + cluster.wait_for_instances(instance_ids) + print "Master started" + except TimeoutException: + print "Timeout while waiting for master instance to start." + return + cluster.print_status((MASTER,)) + master = cluster.check_running(MASTER, 1)[0] + _authorize_client_ports(cluster, master, client_cidrs) + _create_client_hadoop_site_file(cluster, config_dir, master) + +def _authorize_client_ports(cluster, master, client_cidrs): + if not client_cidrs: + logger.debug("No client CIDRs specified, using local address.") + client_ip = url_get('http://checkip.amazonaws.com/').strip() + client_cidrs = ("%s/32" % client_ip,) + logger.debug("Client CIDRs: %s", client_cidrs) + for client_cidr in client_cidrs: + # Allow access to port 80 on master from client + cluster.authorize_role(MASTER, 80, 80, client_cidr) + # Allow access to jobtracker UI on master from client + # (so we can see when the cluster is ready) + cluster.authorize_role(MASTER, 50030, 50030, client_cidr) + # Allow access to namenode and jobtracker via public address from master + # node + master_ip = socket.gethostbyname(master.public_ip) + cluster.authorize_role(MASTER, 8020, 8021, "%s/32" % master_ip) + +def _create_client_hadoop_site_file(cluster, config_dir, master): + cluster_dir = os.path.join(config_dir, cluster.name) + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + if not os.path.exists(cluster_dir): + os.makedirs(cluster_dir) + with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f: + f.write(""" + + + + + hadoop.job.ugi + root,root + + + fs.default.name + hdfs://%(master)s:8020/ + + + mapred.job.tracker + %(master)s:8021 + + + hadoop.socks.server + localhost:6666 + + + hadoop.rpc.socket.factory.class.default + org.apache.hadoop.net.SocksSocketFactory + + + fs.s3.awsAccessKeyId + %(aws_access_key_id)s + + + fs.s3.awsSecretAccessKey + %(aws_secret_access_key)s + + + fs.s3n.awsAccessKeyId + %(aws_access_key_id)s + + + fs.s3n.awsSecretAccessKey + %(aws_secret_access_key)s + + +""" % {'master': master.public_ip, + 'aws_access_key_id': aws_access_key_id, + 'aws_secret_access_key': aws_secret_access_key}) + +def launch_slaves(cluster, number, image_id, size_id, key_name, + public_key, + user_data_file_template=None, placement=None, + user_packages=None, auto_shutdown=None, env_strings=[]): + if user_data_file_template == None: + user_data_file_template = _get_default_user_data_file_template(cluster) + instances = cluster.check_running(MASTER, 1) + if not instances: + return + master = instances[0] + ebs_mappings = '' + storage = cluster.get_storage() + if storage.has_any_storage((SLAVE,)): + ebs_mappings = storage.get_mappings_string_for_role(SLAVE) + replacements = { "%ENV%": build_env_string(env_strings, { + "USER_PACKAGES": user_packages, + "AUTO_SHUTDOWN": auto_shutdown, + "EBS_MAPPINGS": ebs_mappings, + "MASTER_HOST": master.public_ip + }) } + instance_user_data = InstanceUserData(user_data_file_template, replacements) + instance_ids = cluster.launch_instances(SLAVE, number, image_id, size_id, + instance_user_data, + key_name=key_name, + public_key=public_key, + placement=placement) + print "Waiting for slaves to start" + try: + cluster.wait_for_instances(instance_ids) + print "Slaves started" + except TimeoutException: + print "Timeout while waiting for slave instances to start." + return + print + cluster.print_status((SLAVE,)) + +def wait_for_hadoop(cluster, number, timeout=600): + start_time = time.time() + instances = cluster.check_running(MASTER, 1) + if not instances: + return + master = instances[0] + print "Waiting for jobtracker to start" + previous_running = 0 + while True: + if (time.time() - start_time >= timeout): + raise TimeoutException() + try: + actual_running = _number_of_tasktrackers(master.public_ip, 1) + break + except IOError: + pass + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + print + if number > 0: + print "Waiting for %d tasktrackers to start" % number + while actual_running < number: + if (time.time() - start_time >= timeout): + raise TimeoutException() + try: + actual_running = _number_of_tasktrackers(master.public_ip, 5, 2) + if actual_running != previous_running: + sys.stdout.write("%d" % actual_running) + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + previous_running = actual_running + except IOError: + raise TimeoutException() + print + +# The optional ?type=active is a difference between Hadoop 0.18 and 0.20 +NUMBER_OF_TASK_TRACKERS = re.compile( + r'(\d+)') + +def _number_of_tasktrackers(jt_hostname, timeout, retries=0): + jt_page = url_get("http://%s:50030/jobtracker.jsp" % jt_hostname, timeout, + retries) + m = NUMBER_OF_TASK_TRACKERS.search(jt_page) + if m: + return int(m.group(1)) + return 0 + +def print_master_url(cluster): + instances = cluster.check_running(MASTER, 1) + if not instances: + return + master = instances[0] + print "Browse the cluster at http://%s/" % master.public_ip + +def attach_storage(cluster, roles): + storage = cluster.get_storage() + if storage.has_any_storage(roles): + print "Waiting 10 seconds before attaching storage" + time.sleep(10) + for role in roles: + storage.attach(role, cluster.get_instances_in_role(role, 'running')) + storage.print_status(roles) diff --git a/src/contrib/cloud/src/py/hadoop/cloud/providers/__init__.py b/src/contrib/cloud/src/py/hadoop/cloud/providers/__init__.py new file mode 100644 index 0000000000..13878a13a7 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/providers/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/contrib/cloud/src/py/hadoop/cloud/providers/dummy.py b/src/contrib/cloud/src/py/hadoop/cloud/providers/dummy.py new file mode 100644 index 0000000000..19e4173e03 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/providers/dummy.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from hadoop.cloud.cluster import Cluster +from hadoop.cloud.cluster import Instance + +logger = logging.getLogger(__name__) + +class DummyCluster(Cluster): + + @staticmethod + def get_clusters_with_role(role, state="running"): + logger.info("get_clusters_with_role(%s, %s)", role, state) + return ["dummy-cluster"] + + def __init__(self, name, config_dir): + super(DummyCluster, self).__init__(name, config_dir) + logger.info("__init__(%s, %s)", name, config_dir) + + def get_provider_code(self): + return "dummy" + + def authorize_role(self, role, from_port, to_port, cidr_ip): + logger.info("authorize_role(%s, %s, %s, %s)", role, from_port, to_port, + cidr_ip) + + def get_instances_in_role(self, role, state_filter=None): + logger.info("get_instances_in_role(%s, %s)", role, state_filter) + return [Instance(1, '127.0.0.1', '127.0.0.1')] + + def print_status(self, roles, state_filter="running"): + logger.info("print_status(%s, %s)", roles, state_filter) + + def launch_instances(self, role, number, image_id, size_id, + instance_user_data, **kwargs): + logger.info("launch_instances(%s, %s, %s, %s, %s, %s)", role, number, + image_id, size_id, instance_user_data, str(kwargs)) + return [1] + + def wait_for_instances(self, instance_ids, timeout=600): + logger.info("wait_for_instances(%s, %s)", instance_ids, timeout) + + def terminate(self): + logger.info("terminate") + + def delete(self): + logger.info("delete") diff --git a/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py b/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py new file mode 100644 index 0000000000..39dd3e709e --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from boto.ec2.connection import EC2Connection +from boto.exception import EC2ResponseError +import logging +from hadoop.cloud.cluster import Cluster +from hadoop.cloud.cluster import Instance +from hadoop.cloud.cluster import RoleSyntaxException +from hadoop.cloud.cluster import TimeoutException +from hadoop.cloud.storage import JsonVolumeManager +from hadoop.cloud.storage import JsonVolumeSpecManager +from hadoop.cloud.storage import MountableVolume +from hadoop.cloud.storage import Storage +from hadoop.cloud.util import xstr +import os +import re +import subprocess +import sys +import time + +logger = logging.getLogger(__name__) + +def _run_command_on_instance(instance, ssh_options, command): + print "Running ssh %s root@%s '%s'" % \ + (ssh_options, instance.public_dns_name, command) + retcode = subprocess.call("ssh %s root@%s '%s'" % + (ssh_options, instance.public_dns_name, command), + shell=True) + print "Command running on %s returned with value %s" % \ + (instance.public_dns_name, retcode) + +def _wait_for_volume(ec2_connection, volume_id): + """ + Waits until a volume becomes available. + """ + while True: + volumes = ec2_connection.get_all_volumes([volume_id,]) + if volumes[0].status == 'available': + break + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + +class Ec2Cluster(Cluster): + """ + A cluster of EC2 instances. A cluster has a unique name. + + Instances running in the cluster run in a security group with the cluster's + name, and also a name indicating the instance's role, e.g. -foo + to show a "foo" instance. + """ + + @staticmethod + def get_clusters_with_role(role, state="running"): + all_instances = EC2Connection().get_all_instances() + clusters = [] + for res in all_instances: + instance = res.instances[0] + for group in res.groups: + if group.id.endswith("-" + role) and instance.state == state: + clusters.append(re.sub("-%s$" % re.escape(role), "", group.id)) + return clusters + + def __init__(self, name, config_dir): + super(Ec2Cluster, self).__init__(name, config_dir) + self.ec2Connection = EC2Connection() + + def get_provider_code(self): + return "ec2" + + def _get_cluster_group_name(self): + return self.name + + def _check_role_name(self, role): + if not re.match("^[a-zA-Z0-9_]+$", role): + raise RoleSyntaxException("Invalid role name '%s'" % role) + + def _group_name_for_role(self, role): + """ + Return the security group name for an instance in a given role. + """ + self._check_role_name(role) + return "%s-%s" % (self.name, role) + + def _get_group_names(self, role): + self._check_role_name(role) + return [self._get_cluster_group_name(), self._group_name_for_role(role)] + + def _get_all_group_names(self): + security_groups = self.ec2Connection.get_all_security_groups() + security_group_names = \ + [security_group.name for security_group in security_groups] + return security_group_names + + def _get_all_group_names_for_cluster(self): + all_group_names = self._get_all_group_names() + r = [] + if self.name not in all_group_names: + return r + for group in all_group_names: + if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group): + r.append(group) + return r + + def _create_groups(self, role): + """ + Create the security groups for a given role, including a group for the + cluster if it doesn't exist. + """ + self._check_role_name(role) + security_group_names = self._get_all_group_names() + + cluster_group_name = self._get_cluster_group_name() + if not cluster_group_name in security_group_names: + self.ec2Connection.create_security_group(cluster_group_name, + "Cluster (%s)" % (self.name)) + self.ec2Connection.authorize_security_group(cluster_group_name, + cluster_group_name) + # Allow SSH from anywhere + self.ec2Connection.authorize_security_group(cluster_group_name, + ip_protocol="tcp", + from_port=22, to_port=22, + cidr_ip="0.0.0.0/0") + + role_group_name = self._group_name_for_role(role) + if not role_group_name in security_group_names: + self.ec2Connection.create_security_group(role_group_name, + "Role %s (%s)" % (role, self.name)) + + def authorize_role(self, role, from_port, to_port, cidr_ip): + """ + Authorize access to machines in a given role from a given network. + """ + self._check_role_name(role) + role_group_name = self._group_name_for_role(role) + # Revoke first to avoid InvalidPermission.Duplicate error + self.ec2Connection.revoke_security_group(role_group_name, + ip_protocol="tcp", + from_port=from_port, + to_port=to_port, cidr_ip=cidr_ip) + self.ec2Connection.authorize_security_group(role_group_name, + ip_protocol="tcp", + from_port=from_port, + to_port=to_port, + cidr_ip=cidr_ip) + + def _get_instances(self, group_name, state_filter=None): + """ + Get all the instances in a group, filtered by state. + + @param group_name: the name of the group + @param state_filter: the state that the instance should be in + (e.g. "running"), or None for all states + """ + all_instances = self.ec2Connection.get_all_instances() + instances = [] + for res in all_instances: + for group in res.groups: + if group.id == group_name: + for instance in res.instances: + if state_filter == None or instance.state == state_filter: + instances.append(instance) + return instances + + def get_instances_in_role(self, role, state_filter=None): + """ + Get all the instances in a role, filtered by state. + + @param role: the name of the role + @param state_filter: the state that the instance should be in + (e.g. "running"), or None for all states + """ + self._check_role_name(role) + instances = [] + for instance in self._get_instances(self._group_name_for_role(role), + state_filter): + instances.append(Instance(instance.id, instance.dns_name, + instance.private_dns_name)) + return instances + + def _print_instance(self, role, instance): + print "\t".join((role, instance.id, + instance.image_id, + instance.dns_name, instance.private_dns_name, + instance.state, xstr(instance.key_name), instance.instance_type, + str(instance.launch_time), instance.placement)) + + def print_status(self, roles, state_filter="running"): + """ + Print the status of instances in the given roles, filtered by state. + """ + for role in roles: + for instance in self._get_instances(self._group_name_for_role(role), + state_filter): + self._print_instance(role, instance) + + def launch_instances(self, role, number, image_id, size_id, + instance_user_data, **kwargs): + self._check_role_name(role) + + self._create_groups(role) + user_data = instance_user_data.read_as_gzip_stream() + + reservation = self.ec2Connection.run_instances(image_id, min_count=number, + max_count=number, key_name=kwargs.get('key_name', None), + security_groups=self._get_group_names(role), user_data=user_data, + instance_type=size_id, placement=kwargs.get('placement', None)) + return [instance.id for instance in reservation.instances] + + def wait_for_instances(self, instance_ids, timeout=600): + start_time = time.time() + while True: + if (time.time() - start_time >= timeout): + raise TimeoutException() + try: + if self._all_started(self.ec2Connection.get_all_instances(instance_ids)): + break + # don't timeout for race condition where instance is not yet registered + except EC2ResponseError: + pass + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(1) + + def _all_started(self, reservations): + for res in reservations: + for instance in res.instances: + if instance.state != "running": + return False + return True + + def terminate(self): + instances = self._get_instances(self._get_cluster_group_name(), "running") + if instances: + self.ec2Connection.terminate_instances([i.id for i in instances]) + + def delete(self): + """ + Delete the security groups for each role in the cluster, and the group for + the cluster. + """ + group_names = self._get_all_group_names_for_cluster() + for group in group_names: + self.ec2Connection.delete_security_group(group) + + def get_storage(self): + """ + Return the external storage for the cluster. + """ + return Ec2Storage(self) + + +class Ec2Storage(Storage): + """ + Storage volumes for an EC2 cluster. The storage is associated with a named + cluster. Metadata for the storage volumes is kept in a JSON file on the client + machine (in a file called "ec2-storage-.json" in the + configuration directory). + """ + + @staticmethod + def create_formatted_snapshot(cluster, size, availability_zone, image_id, + key_name, ssh_options): + """ + Creates a formatted snapshot of a given size. This saves having to format + volumes when they are first attached. + """ + conn = cluster.ec2Connection + print "Starting instance" + reservation = conn.run_instances(image_id, key_name=key_name, + placement=availability_zone) + instance = reservation.instances[0] + try: + cluster.wait_for_instances([instance.id,]) + print "Started instance %s" % instance.id + except TimeoutException: + print "Timeout" + return + print + print "Waiting 60 seconds before attaching storage" + time.sleep(60) + # Re-populate instance object since it has more details filled in + instance.update() + + print "Creating volume of size %s in %s" % (size, availability_zone) + volume = conn.create_volume(size, availability_zone) + print "Created volume %s" % volume + print "Attaching volume to %s" % instance.id + volume.attach(instance.id, '/dev/sdj') + + _run_command_on_instance(instance, ssh_options, """ + while true ; do + echo 'Waiting for /dev/sdj...'; + if [ -e /dev/sdj ]; then break; fi; + sleep 1; + done; + mkfs.ext3 -F -m 0.5 /dev/sdj + """) + + print "Detaching volume" + conn.detach_volume(volume.id, instance.id) + print "Creating snapshot" + snapshot = volume.create_snapshot() + print "Created snapshot %s" % snapshot.id + _wait_for_volume(conn, volume.id) + print + print "Deleting volume" + volume.delete() + print "Deleted volume" + print "Stopping instance" + terminated = conn.terminate_instances([instance.id,]) + print "Stopped instance %s" % terminated + + def __init__(self, cluster): + super(Ec2Storage, self).__init__(cluster) + self.config_dir = cluster.config_dir + + def _get_storage_filename(self): + return os.path.join(self.config_dir, + "ec2-storage-%s.json" % (self.cluster.name)) + + def create(self, role, number_of_instances, availability_zone, spec_filename): + spec_file = open(spec_filename, 'r') + volume_spec_manager = JsonVolumeSpecManager(spec_file) + volume_manager = JsonVolumeManager(self._get_storage_filename()) + for dummy in range(number_of_instances): + mountable_volumes = [] + volume_specs = volume_spec_manager.volume_specs_for_role(role) + for spec in volume_specs: + logger.info("Creating volume of size %s in %s from snapshot %s" % \ + (spec.size, availability_zone, spec.snapshot_id)) + volume = self.cluster.ec2Connection.create_volume(spec.size, + availability_zone, + spec.snapshot_id) + mountable_volumes.append(MountableVolume(volume.id, spec.mount_point, + spec.device)) + volume_manager.add_instance_storage_for_role(role, mountable_volumes) + + def _get_mountable_volumes(self, role): + storage_filename = self._get_storage_filename() + volume_manager = JsonVolumeManager(storage_filename) + return volume_manager.get_instance_storage_for_role(role) + + def get_mappings_string_for_role(self, role): + mappings = {} + mountable_volumes_list = self._get_mountable_volumes(role) + for mountable_volumes in mountable_volumes_list: + for mountable_volume in mountable_volumes: + mappings[mountable_volume.mount_point] = mountable_volume.device + return ";".join(["%s,%s" % (mount_point, device) for (mount_point, device) + in mappings.items()]) + + def _has_storage(self, role): + return self._get_mountable_volumes(role) + + def has_any_storage(self, roles): + for role in roles: + if self._has_storage(role): + return True + return False + + def _get_ec2_volumes_dict(self, mountable_volumes): + volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])] + volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids) + volumes_dict = {} + for volume in volumes: + volumes_dict[volume.id] = volume + return volumes_dict + + def _print_volume(self, role, volume): + print "\t".join((role, volume.id, str(volume.size), + volume.snapshot_id, volume.availabilityZone, + volume.status, str(volume.create_time), + str(volume.attach_time))) + + def print_status(self, roles): + for role in roles: + mountable_volumes_list = self._get_mountable_volumes(role) + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + for mountable_volumes in mountable_volumes_list: + for mountable_volume in mountable_volumes: + self._print_volume(role, ec2_volumes[mountable_volume.volume_id]) + + def _replace(self, string, replacements): + for (match, replacement) in replacements.iteritems(): + string = string.replace(match, replacement) + return string + + def attach(self, role, instances): + mountable_volumes_list = self._get_mountable_volumes(role) + if not mountable_volumes_list: + return + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + + available_mountable_volumes_list = [] + + available_instances_dict = {} + for instance in instances: + available_instances_dict[instance.id] = instance + + # Iterate over mountable_volumes and retain those that are not attached + # Also maintain a list of instances that have no attached storage + # Note that we do not fill in "holes" (instances that only have some of + # their storage attached) + for mountable_volumes in mountable_volumes_list: + available = True + for mountable_volume in mountable_volumes: + if ec2_volumes[mountable_volume.volume_id].status != 'available': + available = False + attach_data = ec2_volumes[mountable_volume.volume_id].attach_data + instance_id = attach_data.instance_id + if available_instances_dict.has_key(instance_id): + del available_instances_dict[instance_id] + if available: + available_mountable_volumes_list.append(mountable_volumes) + + if len(available_instances_dict) != len(available_mountable_volumes_list): + logger.warning("Number of available instances (%s) and volumes (%s) \ + do not match." \ + % (len(available_instances_dict), + len(available_mountable_volumes_list))) + + for (instance, mountable_volumes) in zip(available_instances_dict.values(), + available_mountable_volumes_list): + print "Attaching storage to %s" % instance.id + for mountable_volume in mountable_volumes: + volume = ec2_volumes[mountable_volume.volume_id] + print "Attaching %s to %s" % (volume.id, instance.id) + volume.attach(instance.id, mountable_volume.device) + + def delete(self, role): + storage_filename = self._get_storage_filename() + volume_manager = JsonVolumeManager(storage_filename) + mountable_volumes_list = volume_manager.get_instance_storage_for_role(role) + ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list) + all_available = True + for volume in ec2_volumes.itervalues(): + if volume.status != 'available': + all_available = False + logger.warning("Volume %s is not available.", volume) + if not all_available: + logger.warning("Some volumes are still in use for role %s.\ + Aborting delete.", role) + return + for volume in ec2_volumes.itervalues(): + volume.delete() + volume_manager.remove_instance_storage_for_role(role) diff --git a/src/contrib/cloud/src/py/hadoop/cloud/storage.py b/src/contrib/cloud/src/py/hadoop/cloud/storage.py new file mode 100644 index 0000000000..7513fdaaee --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/storage.py @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Classes for controlling external cluster storage. +""" + +import logging +import simplejson as json + +logger = logging.getLogger(__name__) + +class VolumeSpec(object): + """ + The specification for a storage volume, encapsulating all the information + needed to create a volume and ultimately mount it on an instance. + """ + def __init__(self, size, mount_point, device, snapshot_id): + self.size = size + self.mount_point = mount_point + self.device = device + self.snapshot_id = snapshot_id + + +class JsonVolumeSpecManager(object): + """ + A container for VolumeSpecs. This object can read VolumeSpecs specified in + JSON. + """ + def __init__(self, spec_file): + self.spec = json.load(spec_file) + + def volume_specs_for_role(self, role): + return [VolumeSpec(d["size_gb"], d["mount_point"], d["device"], + d["snapshot_id"]) for d in self.spec[role]] + + def get_mappings_string_for_role(self, role): + """ + Returns a short string of the form + "mount_point1,device1;mount_point2,device2;..." + which is useful for passing as an environment variable. + """ + return ";".join(["%s,%s" % (d["mount_point"], d["device"]) + for d in self.spec[role]]) + + +class MountableVolume(object): + """ + A storage volume that has been created. It may or may not have been attached + or mounted to an instance. + """ + def __init__(self, volume_id, mount_point, device): + self.volume_id = volume_id + self.mount_point = mount_point + self.device = device + + +class JsonVolumeManager(object): + + def __init__(self, filename): + self.filename = filename + + def _load(self): + try: + return json.load(open(self.filename, "r")) + except IOError: + logger.debug("File %s does not exist.", self.filename) + return {} + + def _store(self, obj): + return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2) + + def add_instance_storage_for_role(self, role, mountable_volumes): + json_dict = self._load() + mv_dicts = [mv.__dict__ for mv in mountable_volumes] + json_dict.setdefault(role, []).append(mv_dicts) + self._store(json_dict) + + def remove_instance_storage_for_role(self, role): + json_dict = self._load() + del json_dict[role] + self._store(json_dict) + + def get_instance_storage_for_role(self, role): + """ + Returns a list of lists of MountableVolume objects. Each nested list is + the storage for one instance. + """ + try: + json_dict = self._load() + instance_storage = [] + for instance in json_dict[role]: + vols = [] + for vol in instance: + vols.append(MountableVolume(vol["volume_id"], vol["mount_point"], + vol["device"])) + instance_storage.append(vols) + return instance_storage + except KeyError: + return [] + +class Storage(object): + """ + Storage volumes for a cluster. The storage is associated with a named + cluster. Many clusters just have local storage, in which case this is + not used. + """ + + def __init__(self, cluster): + self.cluster = cluster + + def create(self, role, number_of_instances, availability_zone, spec_filename): + """ + Create new storage volumes for instances with the given role, according to + the mapping defined in the spec file. + """ + pass + + def get_mappings_string_for_role(self, role): + """ + Returns a short string of the form + "mount_point1,device1;mount_point2,device2;..." + which is useful for passing as an environment variable. + """ + raise Exception("Unimplemented") + + def has_any_storage(self, roles): + """ + Return True if any of the given roles has associated storage + """ + return False + + def print_status(self, roles): + """ + Print the status of storage volumes for the given roles. + """ + pass + + def attach(self, role, instances): + """ + Attach volumes for a role to instances. Some volumes may already be + attached, in which case they are ignored, and we take care not to attach + multiple volumes to an instance. + """ + pass + + def delete(self, role): + """ + Permanently delete all the storage for a role. + """ + pass diff --git a/src/contrib/cloud/src/py/hadoop/cloud/util.py b/src/contrib/cloud/src/py/hadoop/cloud/util.py new file mode 100644 index 0000000000..dcc6f51711 --- /dev/null +++ b/src/contrib/cloud/src/py/hadoop/cloud/util.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions. +""" + +import ConfigParser +import socket +import urllib2 + +def bash_quote(text): + """Quotes a string for bash, by using single quotes.""" + if text == None: + return "" + return "'%s'" % text.replace("'", "'\\''") + +def bash_quote_env(env): + """Quotes the value in an environment variable assignment.""" + if env.find("=") == -1: + return env + (var, value) = env.split("=") + return "%s=%s" % (var, bash_quote(value)) + +def build_env_string(env_strings=[], pairs={}): + """Build a bash environment variable assignment""" + env = '' + if env_strings: + for env_string in env_strings: + env += "%s " % bash_quote_env(env_string) + if pairs: + for key, val in pairs.items(): + env += "%s=%s " % (key, bash_quote(val)) + return env[:-1] + +def merge_config_with_options(section_name, config, options): + """ + Merge configuration options with a dictionary of options. + Keys in the options dictionary take precedence. + """ + res = {} + try: + for (key, value) in config.items(section_name): + if value.find("\n") != -1: + res[key] = value.split("\n") + else: + res[key] = value + except ConfigParser.NoSectionError: + pass + for key in options: + if options[key] != None: + res[key] = options[key] + return res + +def url_get(url, timeout=10, retries=0): + """ + Retrieve content from the given URL. + """ + # in Python 2.6 we can pass timeout to urllib2.urlopen + socket.setdefaulttimeout(timeout) + attempts = 0 + while True: + try: + return urllib2.urlopen(url).read() + except urllib2.URLError: + attempts = attempts + 1 + if attempts > retries: + raise + +def xstr(string): + """Sane string conversion: return an empty string if string is None.""" + return '' if string is None else str(string) diff --git a/src/contrib/cloud/src/test/hadoop/__init__.py b/src/contrib/cloud/src/test/hadoop/__init__.py new file mode 100644 index 0000000000..13878a13a7 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/contrib/cloud/src/test/hadoop/cloud/__init__.py b/src/contrib/cloud/src/test/hadoop/cloud/__init__.py new file mode 100644 index 0000000000..13878a13a7 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/contrib/cloud/src/test/hadoop/cloud/alltests.py b/src/contrib/cloud/src/test/hadoop/cloud/alltests.py new file mode 100644 index 0000000000..3eb8341922 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/alltests.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +from hadoop.cloud.testcluster import TestCluster +from hadoop.cloud.teststorage import TestJsonVolumeSpecManager +from hadoop.cloud.teststorage import TestJsonVolumeManager +from hadoop.cloud.testuserdata import TestInstanceUserData +from hadoop.cloud.testutil import TestUtilFunctions + +def testSuite(): + alltests = unittest.TestSuite([ + unittest.makeSuite(TestCluster, 'test'), + unittest.makeSuite(TestJsonVolumeSpecManager, 'test'), + unittest.makeSuite(TestJsonVolumeManager, 'test'), + unittest.makeSuite(TestInstanceUserData, 'test'), + unittest.makeSuite(TestUtilFunctions, 'test'), + ]) + return alltests + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + sys.exit(not runner.run(testSuite()).wasSuccessful()) diff --git a/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py b/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py new file mode 100644 index 0000000000..69b9bc3d86 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/testcluster.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from hadoop.cloud.cluster import RoleSyntaxException +from hadoop.cloud.providers.ec2 import Ec2Cluster + +class TestCluster(unittest.TestCase): + + def test_group_name_for_role(self): + cluster = Ec2Cluster("test-cluster", None) + self.assertEqual("test-cluster-foo", cluster._group_name_for_role("foo")) + + def test_check_role_name_valid(self): + cluster = Ec2Cluster("test-cluster", None) + cluster._check_role_name( + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_") + + def test_check_role_name_dash_is_invalid(self): + cluster = Ec2Cluster("test-cluster", None) + self.assertRaises(RoleSyntaxException, cluster._check_role_name, "a-b") + +if __name__ == '__main__': + unittest.main() diff --git a/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py b/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py new file mode 100644 index 0000000000..45b43d7a77 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/teststorage.py @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import simplejson as json +from StringIO import StringIO + +from hadoop.cloud.storage import MountableVolume +from hadoop.cloud.storage import JsonVolumeManager +from hadoop.cloud.storage import JsonVolumeSpecManager + +spec = { + "master": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj", + "snapshot_id": "snap_1"}, + ), + "slave": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj", + "snapshot_id": "snap_2"}, + {"size_gb":"10", "mount_point":"/data1", "device":"/dev/sdk", + "snapshot_id": "snap_3"}, + ) + } + +class TestJsonVolumeSpecManager(unittest.TestCase): + + def test_volume_specs_for_role(self): + + input = StringIO(json.dumps(spec)) + + volume_spec_manager = JsonVolumeSpecManager(input) + + master_specs = volume_spec_manager.volume_specs_for_role("master") + self.assertEqual(1, len(master_specs)) + self.assertEqual("/", master_specs[0].mount_point) + self.assertEqual("8", master_specs[0].size) + self.assertEqual("/dev/sdj", master_specs[0].device) + self.assertEqual("snap_1", master_specs[0].snapshot_id) + + slave_specs = volume_spec_manager.volume_specs_for_role("slave") + self.assertEqual(2, len(slave_specs)) + self.assertEqual("snap_2", slave_specs[0].snapshot_id) + self.assertEqual("snap_3", slave_specs[1].snapshot_id) + + self.assertRaises(KeyError, volume_spec_manager.volume_specs_for_role, + "no-such-role") + + def test_get_mappings_string_for_role(self): + + input = StringIO(json.dumps(spec)) + + volume_spec_manager = JsonVolumeSpecManager(input) + + master_mappings = volume_spec_manager.get_mappings_string_for_role("master") + self.assertEqual("/,/dev/sdj", master_mappings) + + slave_mappings = volume_spec_manager.get_mappings_string_for_role("slave") + self.assertEqual("/,/dev/sdj;/data1,/dev/sdk", slave_mappings) + + self.assertRaises(KeyError, + volume_spec_manager.get_mappings_string_for_role, + "no-such-role") + +class TestJsonVolumeManager(unittest.TestCase): + + def setUp(self): + try: + os.remove("volumemanagertest.json") + except OSError: + pass + + def test_add_instance_storage_for_role(self): + volume_manager = JsonVolumeManager("volumemanagertest.json") + self.assertEqual(0, + len(volume_manager.get_instance_storage_for_role("master"))) + + volume_manager.add_instance_storage_for_role("master", + [MountableVolume("vol_1", "/", + "/dev/sdj")]) + master_storage = volume_manager.get_instance_storage_for_role("master") + self.assertEqual(1, len(master_storage)) + master_storage_instance0 = master_storage[0] + self.assertEqual(1, len(master_storage_instance0)) + master_storage_instance0_vol0 = master_storage_instance0[0] + self.assertEqual("vol_1", master_storage_instance0_vol0.volume_id) + self.assertEqual("/", master_storage_instance0_vol0.mount_point) + self.assertEqual("/dev/sdj", master_storage_instance0_vol0.device) + + volume_manager.add_instance_storage_for_role("slave", + [MountableVolume("vol_2", "/", + "/dev/sdj")]) + self.assertEqual(1, + len(volume_manager.get_instance_storage_for_role("master"))) + slave_storage = volume_manager.get_instance_storage_for_role("slave") + self.assertEqual(1, len(slave_storage)) + slave_storage_instance0 = slave_storage[0] + self.assertEqual(1, len(slave_storage_instance0)) + slave_storage_instance0_vol0 = slave_storage_instance0[0] + self.assertEqual("vol_2", slave_storage_instance0_vol0.volume_id) + self.assertEqual("/", slave_storage_instance0_vol0.mount_point) + self.assertEqual("/dev/sdj", slave_storage_instance0_vol0.device) + + volume_manager.add_instance_storage_for_role("slave", + [MountableVolume("vol_3", "/", "/dev/sdj"), + MountableVolume("vol_4", "/data1", "/dev/sdk")]) + self.assertEqual(1, + len(volume_manager.get_instance_storage_for_role("master"))) + slave_storage = volume_manager.get_instance_storage_for_role("slave") + self.assertEqual(2, len(slave_storage)) + slave_storage_instance0 = slave_storage[0] + slave_storage_instance1 = slave_storage[1] + self.assertEqual(1, len(slave_storage_instance0)) + self.assertEqual(2, len(slave_storage_instance1)) + slave_storage_instance1_vol0 = slave_storage_instance1[0] + slave_storage_instance1_vol1 = slave_storage_instance1[1] + self.assertEqual("vol_3", slave_storage_instance1_vol0.volume_id) + self.assertEqual("/", slave_storage_instance1_vol0.mount_point) + self.assertEqual("/dev/sdj", slave_storage_instance1_vol0.device) + self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id) + self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point) + self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py b/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py new file mode 100644 index 0000000000..e6f527bcd4 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tempfile +import unittest + +from hadoop.cloud.cluster import InstanceUserData + +class TestInstanceUserData(unittest.TestCase): + + def test_replacement(self): + file = tempfile.NamedTemporaryFile() + file.write("Contents go here") + file.flush() + self.assertEqual("Contents go here", + InstanceUserData(file.name, {}).read()) + self.assertEqual("Contents were here", + InstanceUserData(file.name, { "go": "were"}).read()) + self.assertEqual("Contents here", + InstanceUserData(file.name, { "go": None}).read()) + file.close() + + def test_read_file_url(self): + file = tempfile.NamedTemporaryFile() + file.write("Contents go here") + file.flush() + self.assertEqual("Contents go here", + InstanceUserData("file://%s" % file.name, {}).read()) + file.close() + +if __name__ == '__main__': + unittest.main() diff --git a/src/contrib/cloud/src/test/hadoop/cloud/testutil.py b/src/contrib/cloud/src/test/hadoop/cloud/testutil.py new file mode 100644 index 0000000000..2e3f2281f1 --- /dev/null +++ b/src/contrib/cloud/src/test/hadoop/cloud/testutil.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ConfigParser +import StringIO +import unittest + +from hadoop.cloud.util import bash_quote +from hadoop.cloud.util import bash_quote_env +from hadoop.cloud.util import build_env_string +from hadoop.cloud.util import merge_config_with_options +from hadoop.cloud.util import xstr + +class TestUtilFunctions(unittest.TestCase): + + def test_bash_quote(self): + self.assertEqual("", bash_quote(None)) + self.assertEqual("''", bash_quote("")) + self.assertEqual("'a'", bash_quote("a")) + self.assertEqual("'a b'", bash_quote("a b")) + self.assertEqual("'a\b'", bash_quote("a\b")) + self.assertEqual("'a '\\'' b'", bash_quote("a ' b")) + + def test_bash_quote_env(self): + self.assertEqual("", bash_quote_env("")) + self.assertEqual("a", bash_quote_env("a")) + self.assertEqual("a='b'", bash_quote_env("a=b")) + self.assertEqual("a='b c'", bash_quote_env("a=b c")) + self.assertEqual("a='b\c'", bash_quote_env("a=b\c")) + self.assertEqual("a='b '\\'' c'", bash_quote_env("a=b ' c")) + + def test_build_env_string(self): + self.assertEqual("", build_env_string()) + self.assertEqual("a='b' c='d'", + build_env_string(env_strings=["a=b", "c=d"])) + self.assertEqual("a='b' c='d'", + build_env_string(pairs={"a": "b", "c": "d"})) + + def test_merge_config_with_options(self): + options = { "a": "b" } + config = ConfigParser.ConfigParser() + self.assertEqual({ "a": "b" }, + merge_config_with_options("section", config, options)) + config.add_section("section") + self.assertEqual({ "a": "b" }, + merge_config_with_options("section", config, options)) + config.set("section", "a", "z") + config.set("section", "c", "d") + self.assertEqual({ "a": "z", "c": "d" }, + merge_config_with_options("section", config, {})) + self.assertEqual({ "a": "b", "c": "d" }, + merge_config_with_options("section", config, options)) + + def test_merge_config_with_options_list(self): + config = ConfigParser.ConfigParser() + config.readfp(StringIO.StringIO("""[section] +env1=a=b + c=d +env2=e=f + g=h""")) + self.assertEqual({ "env1": ["a=b", "c=d"], "env2": ["e=f", "g=h"] }, + merge_config_with_options("section", config, {})) + + def test_xstr(self): + self.assertEqual("", xstr(None)) + self.assertEqual("a", xstr("a")) + +if __name__ == '__main__': + unittest.main()