diff --git a/src/saveVersion.sh b/src/saveVersion.sh new file mode 100755 index 0000000000..c2c067d65d --- /dev/null +++ b/src/saveVersion.sh @@ -0,0 +1,59 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file is used to generate the package-info.java class that +# records the version, revision, branch, user, timestamp, and url +unset LANG +unset LC_CTYPE +version=$1 +user=`whoami` +date=`date` +cwd=`pwd` +if [ -d .git ]; then + revision=`git log -1 --pretty=format:"%H"` + hostname=`hostname` + branch=`git branch | sed -n -e 's/^* //p'` + url="git://${hostname}${cwd}" +elif [ -d .svn ]; then + revision=`svn info | sed -n -e 's/Last Changed Rev: \(.*\)/\1/p'` + url=`svn info | sed -n -e 's/URL: \(.*\)/\1/p'` + # Get canonical branch (branches/X, tags/X, or trunk) + branch=`echo $url | sed -n -e 's,.*\(branches/.*\)$,\1,p' \ + -e 's,.*\(tags/.*\)$,\1,p' \ + -e 's,.*trunk$,trunk,p'` +else + revision="Unknown" + branch="Unknown" + url="file://$cwd" +fi +srcChecksum=`find src -name '*.java' | LC_ALL=C sort | xargs md5sum | md5sum | cut -d ' ' -f 1` + +mkdir -p build/src/org/apache/hadoop +cat << EOF | \ + sed -e "s/VERSION/$version/" -e "s/USER/$user/" -e "s/DATE/$date/" \ + -e "s|URL|$url|" -e "s/REV/$revision/" \ + -e "s|BRANCH|$branch|" -e "s/SRCCHECKSUM/$srcChecksum/" \ + > build/src/org/apache/hadoop/package-info.java +/* + * Generated by src/saveVersion.sh + */ +@HadoopVersionAnnotation(version="VERSION", revision="REV", branch="BRANCH", + user="USER", date="DATE", url="URL", + srcChecksum="SRCCHECKSUM") +package org.apache.hadoop; +EOF diff --git a/src/test/bin/test-patch.sh b/src/test/bin/test-patch.sh new file mode 100755 index 0000000000..dc61b8870c --- /dev/null +++ b/src/test/bin/test-patch.sh @@ -0,0 +1,692 @@ +#!/usr/bin/env bash + +#set -x +ulimit -n 1024 + +### Setup some variables. +### JOB_NAME, SVN_REVISION, and BUILD_NUMBER are set by Hudson if it is run by patch process + +############################################################################### +parseArgs() { + case "$1" in + HUDSON) + ### Set HUDSON to true to indicate that this script is being run by Hudson + HUDSON=true + if [[ $# != 19 ]] ; then + echo "ERROR: usage $0 HUDSON " + cleanupAndExit 0 + fi + PATCH_DIR=$2 + SUPPORT_DIR=$3 + PS=$4 + WGET=$5 + JIRACLI=$6 + SVN=$7 + GREP=$8 + PATCH=$9 + FINDBUGS_HOME=${10} + FORREST_HOME=${11} + ECLIPSE_HOME=${12} + PYTHON_HOME=${13} + BASEDIR=${14} + TRIGGER_BUILD_URL=${15} + JIRA_PASSWD=${16} + JAVA5_HOME=${17} + CURL=${18} + defect=${19} + + ### Retrieve the defect number + if [ -z "$defect" ] ; then + echo "Could not determine the patch to test. Exiting." + cleanupAndExit 0 + fi + + if [ ! -e "$PATCH_DIR" ] ; then + mkdir -p $PATCH_DIR + fi + + ECLIPSE_PROPERTY="-Declipse.home=$ECLIPSE_HOME" + PYTHON_PROPERTY="-Dpython.home=$PYTHON_HOME" + ;; + DEVELOPER) + ### Set HUDSON to false to indicate that this script is being run by a developer + HUDSON=false + if [[ $# != 10 ]] ; then + echo "ERROR: usage $0 DEVELOPER " + cleanupAndExit 0 + fi + ### PATCH_FILE contains the location of the patchfile + PATCH_FILE=$2 + if [[ ! -e "$PATCH_FILE" ]] ; then + echo "Unable to locate the patch file $PATCH_FILE" + cleanupAndExit 0 + fi + PATCH_DIR=$3 + ### Check if $PATCH_DIR exists. If it does not exist, create a new directory + if [[ ! -e "$PATCH_DIR" ]] ; then + mkdir "$PATCH_DIR" + if [[ $? == 0 ]] ; then + echo "$PATCH_DIR has been created" + else + echo "Unable to create $PATCH_DIR" + cleanupAndExit 0 + fi + fi + SVN=$4 + GREP=$5 + PATCH=$6 + FINDBUGS_HOME=$7 + FORREST_HOME=$8 + BASEDIR=$9 + JAVA5_HOME=${10} + ### Obtain the patch filename to append it to the version number + defect=`basename $PATCH_FILE` + ;; + *) + echo "ERROR: usage $0 HUDSON [args] | DEVELOPER [args]" + cleanupAndExit 0 + ;; + esac +} + +############################################################################### +checkout () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Testing patch for ${defect}." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + ### When run by a developer, if the workspace contains modifications, do not continue + status=`$SVN stat` + if [[ $HUDSON == "false" ]] ; then + if [[ "$status" != "" ]] ; then + echo "ERROR: can't run in a workspace that contains the following modifications" + echo "$status" + cleanupAndExit 1 + fi + else + cd $BASEDIR + $SVN revert -R . + rm -rf `$SVN status --no-ignore` + $SVN update + fi + return $? +} + +############################################################################### +setup () { + ### Download latest patch file (ignoring .htm and .html) when run from patch process + if [[ $HUDSON == "true" ]] ; then + $WGET -q -O $PATCH_DIR/jira http://issues.apache.org/jira/browse/$defect + if [[ `$GREP -c 'Patch Available' $PATCH_DIR/jira` == 0 ]] ; then + echo "$defect is not \"Patch Available\". Exiting." + cleanupAndExit 0 + fi + relativePatchURL=`$GREP -o '"/jira/secure/attachment/[0-9]*/[^"]*' $PATCH_DIR/jira | $GREP -v -e 'htm[l]*$' | sort | tail -1 | $GREP -o '/jira/secure/attachment/[0-9]*/[^"]*'` + patchURL="http://issues.apache.org${relativePatchURL}" + patchNum=`echo $patchURL | $GREP -o '[0-9]*/' | $GREP -o '[0-9]*'` + echo "$defect patch is being downloaded at `date` from" + echo "$patchURL" + $WGET -q -O $PATCH_DIR/patch $patchURL + VERSION=${SVN_REVISION}_${defect}_PATCH-${patchNum} + JIRA_COMMENT="Here are the results of testing the latest attachment + $patchURL + against trunk revision ${SVN_REVISION}." + + ### Copy in any supporting files needed by this process + cp -r $SUPPORT_DIR/lib/* ./lib + #PENDING: cp -f $SUPPORT_DIR/etc/checkstyle* ./src/test + ### Copy the patch file to $PATCH_DIR + else + VERSION=PATCH-${defect} + cp $PATCH_FILE $PATCH_DIR/patch + if [[ $? == 0 ]] ; then + echo "Patch file $PATCH_FILE copied to $PATCH_DIR" + else + echo "Could not copy $PATCH_FILE to $PATCH_DIR" + cleanupAndExit 0 + fi + fi + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Pre-building trunk to determine trunk number" + echo " of release audit, javac, and Findbugs warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= releaseaudit > $PATCH_DIR/trunkReleaseAuditWarnings.txt 2>&1" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= releaseaudit > $PATCH_DIR/trunkReleaseAuditWarnings.txt 2>&1 + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Djavac.args="-Xlint -Xmaxwarns 1000" $ECLIPSE_PROPERTY -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= clean tar > $PATCH_DIR/trunkJavacWarnings.txt 2>&1" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Djavac.args="-Xlint -Xmaxwarns 1000" $ECLIPSE_PROPERTY -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= clean tar > $PATCH_DIR/trunkJavacWarnings.txt 2>&1 + if [[ $? != 0 ]] ; then + echo "Trunk compilation is broken?" + cleanupAndExit 1 + fi + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Dfindbugs.home=$FINDBUGS_HOME -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= findbugs > /dev/null 2>&1" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Dfindbugs.home=$FINDBUGS_HOME -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= findbugs > /dev/null 2>&1 + if [[ $? != 0 ]] ; then + echo "Trunk findbugs is broken?" + cleanupAndExit 1 + fi + cp $BASEDIR/build/test/findbugs/*.xml $PATCH_DIR/trunkFindbugsWarnings.xml +} + +############################################################################### +### Check for @author tags in the patch +checkAuthor () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Checking there are no @author tags in the patch." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + authorTags=`$GREP -c -i '@author' $PATCH_DIR/patch` + echo "There appear to be $authorTags @author tags in the patch." + if [[ $authorTags != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 @author. The patch appears to contain $authorTags @author tags which the Hadoop community has agreed to not allow in code contributions." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 @author. The patch does not contain any @author tags." + return 0 +} + +############################################################################### +### Check for tests in the patch +checkTests () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Checking there are new or changed tests in the patch." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + testReferences=`$GREP -c -i '/test' $PATCH_DIR/patch` + echo "There appear to be $testReferences test files referenced in the patch." + if [[ $testReferences == 0 ]] ; then + if [[ $HUDSON == "true" ]] ; then + patchIsDoc=`$GREP -c -i 'title="documentation' $PATCH_DIR/jira` + if [[ $patchIsDoc != 0 ]] ; then + echo "The patch appears to be a documentation patch that doesn't require tests." + JIRA_COMMENT="$JIRA_COMMENT + + +0 tests included. The patch appears to be a documentation patch that doesn't require tests." + return 0 + fi + fi + JIRA_COMMENT="$JIRA_COMMENT + + -1 tests included. The patch doesn't appear to include any new or modified tests. + Please justify why no tests are needed for this patch." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 tests included. The patch appears to include $testReferences new or modified tests." + return 0 +} + +############################################################################### +### Attempt to apply the patch +applyPatch () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Applying patch." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + $PATCH -E -p0 < $PATCH_DIR/patch + if [[ $? != 0 ]] ; then + echo "PATCH APPLICATION FAILED" + JIRA_COMMENT="$JIRA_COMMENT + + -1 patch. The patch command could not apply the patch." + return 1 + fi + return 0 +} + +############################################################################### +### Check there are no javadoc warnings +checkJavadocWarnings () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Determining number of patched javadoc warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= clean javadoc | tee $PATCH_DIR/patchJavadocWarnings.txt" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= clean javadoc | tee $PATCH_DIR/patchJavadocWarnings.txt + javadocWarnings=`$GREP -c '\[javadoc\] [0-9]* warning' $PATCH_DIR/patchJavadocWarnings.txt` + echo "" + echo "" + echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build." + if [[ $javadocWarnings != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 javadoc. The javadoc tool appears to have generated $javadocWarnings warning messages." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 javadoc. The javadoc tool did not generate any warning messages." +return 0 +} + +############################################################################### +### Check there are no changes in the number of Javac warnings +checkJavacWarnings () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Determining number of patched javac warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Djavac.args="-Xlint -Xmaxwarns 1000" $ECLIPSE_PROPERTY -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= tar > $PATCH_DIR/patchJavacWarnings.txt 2>&1" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Djavac.args="-Xlint -Xmaxwarns 1000" $ECLIPSE_PROPERTY -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= tar > $PATCH_DIR/patchJavacWarnings.txt 2>&1 + + ### Compare trunk and patch javac warning numbers + if [[ -f $PATCH_DIR/patchJavacWarnings.txt ]] ; then + trunkJavacWarnings=`$GREP -o '\[javac\] [0-9]* warning' $PATCH_DIR/trunkJavacWarnings.txt | awk '{total += $2} END {print total}'` + patchJavacWarnings=`$GREP -o '\[javac\] [0-9]* warning' $PATCH_DIR/patchJavacWarnings.txt | awk '{total += $2} END {print total}'` + echo "There appear to be $trunkJavacWarnings javac compiler warnings before the patch and $patchJavacWarnings javac compiler warnings after applying the patch." + if [[ $patchJavacWarnings != "" && $trunkJavacWarnings != "" ]] ; then + if [[ $patchJavacWarnings > $trunkJavacWarnings ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 javac. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)." + return 1 + fi + fi + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 javac. The applied patch does not increase the total number of javac compiler warnings." + return 0 +} + +############################################################################### +### Check there are no changes in the number of release audit (RAT) warnings +checkReleaseAuditWarnings () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Determining number of patched release audit warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= releaseaudit > $PATCH_DIR/patchReleaseAuditWarnings.txt 2>&1" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= releaseaudit > $PATCH_DIR/patchReleaseAuditWarnings.txt 2>&1 + + ### Compare trunk and patch release audit warning numbers + if [[ -f $PATCH_DIR/patchReleaseAuditWarnings.txt ]] ; then + trunkReleaseAuditWarnings=`$GREP -c '\!?????' $PATCH_DIR/trunkReleaseAuditWarnings.txt` + patchReleaseAuditWarnings=`$GREP -c '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt` + echo "" + echo "" + echo "There appear to be $trunkReleaseAuditWarnings release audit warnings before the patch and $patchReleaseAuditWarnings release audit warnings after applying the patch." + if [[ $patchReleaseAuditWarnings != "" && $trunkReleaseAuditWarnings != "" ]] ; then + if [[ $patchReleaseAuditWarnings > $trunkReleaseAuditWarnings ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 release audit. The applied patch generated $patchReleaseAuditWarnings release audit warnings (more than the trunk's current $trunkReleaseAuditWarnings warnings)." + $GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt + $GREP '\!?????' $PATCH_DIR/trunkReleaseAuditWarnings.txt > $PATCH_DIR/trunkReleaseAuditProblems.txt + echo "A diff of patched release audit warnings with trunk release audit warnings." > $PATCH_DIR/releaseAuditDiffWarnings.txt + echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." > $PATCH_DIR/releaseAuditDiffWarnings.txt + echo "" > $PATCH_DIR/releaseAuditDiffWarnings.txt + diff $PATCH_DIR/patchReleaseAuditProblems.txt $PATCH_DIR/trunkReleaseAuditProblems.txt >> $PATCH_DIR/releaseAuditDiffWarnings.txt + JIRA_COMMENT_FOOTER="Release audit warnings: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/artifact/trunk/current/releaseAuditDiffWarnings.txt +$JIRA_COMMENT_FOOTER" + return 1 + fi + fi + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 release audit. The applied patch does not increase the total number of release audit warnings." + return 0 +} + +############################################################################### +### Check there are no changes in the number of Checkstyle warnings +checkStyle () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Determining number of patched checkstyle warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "THIS IS NOT IMPLEMENTED YET" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= checkstyle" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= checkstyle + JIRA_COMMENT_FOOTER="Checkstyle results: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/artifact/trunk/build/test/checkstyle-errors.html +$JIRA_COMMENT_FOOTER" + ### TODO: calculate actual patchStyleErrors +# patchStyleErrors=0 +# if [[ $patchStyleErrors != 0 ]] ; then +# JIRA_COMMENT="$JIRA_COMMENT +# +# -1 checkstyle. The patch generated $patchStyleErrors code style errors." +# return 1 +# fi +# JIRA_COMMENT="$JIRA_COMMENT +# +# +1 checkstyle. The patch generated 0 code style errors." + return 0 +} + +############################################################################### +### Check there are no changes in the number of Findbugs warnings +checkFindbugsWarnings () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Determining number of patched Findbugs warnings." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -Dfindbugs.home=$FINDBUGS_HOME -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= findbugs" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -Dfindbugs.home=$FINDBUGS_HOME -Djava5.home=${JAVA5_HOME} -Dforrest.home=${FORREST_HOME} -DHadoopPatchProcess= findbugs + if [ $? != 0 ] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 findbugs. The patch appears to cause Findbugs to fail." + return 1 + fi +JIRA_COMMENT_FOOTER="Findbugs warnings: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html +$JIRA_COMMENT_FOOTER" + cp $BASEDIR/build/test/findbugs/*.xml $PATCH_DIR/patchFindbugsWarnings.xml +$FINDBUGS_HOME/bin/setBugDatabaseInfo -timestamp "01/01/1999" \ + $PATCH_DIR/trunkFindbugsWarnings.xml \ + $PATCH_DIR/trunkFindbugsWarnings.xml + $FINDBUGS_HOME/bin/setBugDatabaseInfo -timestamp "01/01/2000" \ + $PATCH_DIR/patchFindbugsWarnings.xml \ + $PATCH_DIR/patchFindbugsWarnings.xml + $FINDBUGS_HOME/bin/computeBugHistory -output $PATCH_DIR/findbugsMerge.xml \ + $PATCH_DIR/trunkFindbugsWarnings.xml \ + $PATCH_DIR/patchFindbugsWarnings.xml + findbugsWarnings=`$FINDBUGS_HOME/bin/filterBugs -first "01/01/2000" $PATCH_DIR/findbugsMerge.xml \ + $BASEDIR/build/test/findbugs/newPatchFindbugsWarnings.xml | /usr/bin/awk '{print $1}'` + $FINDBUGS_HOME/bin/convertXmlToText -html \ + $BASEDIR/build/test/findbugs/newPatchFindbugsWarnings.xml \ + $BASEDIR/build/test/findbugs/newPatchFindbugsWarnings.html + cp $BASEDIR/build/test/findbugs/newPatchFindbugsWarnings.html $PATCH_DIR/newPatchFindbugsWarnings.html + cp $BASEDIR/build/test/findbugs/newPatchFindbugsWarnings.xml $PATCH_DIR/newPatchFindbugsWarnings.xml + if [[ $findbugsWarnings != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 findbugs. The patch appears to introduce $findbugsWarnings new Findbugs warnings." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 findbugs. The patch does not introduce any new Findbugs warnings." + return 0 +} + +############################################################################### +### Run the test-core target +runCoreTests () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Running core tests." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + + ### Kill any rogue build processes from the last attempt + $PS -auxwww | $GREP HadoopPatchProcess | /usr/bin/nawk '{print $2}' | /usr/bin/xargs -t -I {} /usr/bin/kill -9 {} > /dev/null + + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=yes -Dcompile.c++=yes -Dforrest.home=$FORREST_HOME -Djava5.home=$JAVA5_HOME create-c++-configure test-core" + $ANT_HOME/bin/ant -Dversion="${VERSION}" -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=yes -Dcompile.c++=yes -Dforrest.home=$FORREST_HOME -Djava5.home=$JAVA5_HOME create-c++-configure test-core + if [[ $? != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 core tests. The patch failed core unit tests." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 core tests. The patch passed core unit tests." + return 0 +} + +############################################################################### +### Tests parts of contrib specific to the eclipse files +checkJarFilesDeclaredInEclipse () { + export DECLARED_JARS=$(sed -n 's@.*kind="lib".*path="\(.*jar\)".*@\1@p' < .eclipse.templates/.classpath) + export PRESENT_JARS=$(find build/ivy/lib/Hadoop/common/ lib/ src/test/lib/ -name '*.jar' |sort) + # When run by Hudson, consider libs from ${SUPPORT_DIR} declared + if [[ ${HUDSON} == "true" ]]; then + DECLARED_JARS="${DECLARED_JARS} $(cd "${SUPPORT_DIR}"; find lib -name '*.jar')" + fi + DECLARED_JARS=$(sed 'y/ /\n/' <<< ${DECLARED_JARS} | sort) + export ECLIPSE_DECLARED_SRC=$(sed -n 's@.*kind="src".*path="\(.*\)".*@\1@p' < .eclipse.templates/.classpath |sort) + + if [ "${DECLARED_JARS}" != "${PRESENT_JARS}" ]; then + echo " +FAILED. Some jars are not declared in the Eclipse project. + Declared jars: ${DECLARED_JARS} + Present jars: ${PRESENT_JARS}" + return 1 + fi + for dir in $ECLIPSE_DECLARED_SRC; do + [ '!' -d $dir ] && echo " +FAILED: $dir is referenced in the Eclipse project although it doesn't exists anymore." && return 1 + done + return 0 +} + +checkEclipse () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Running Eclipse classpath verification." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + + checkJarFilesDeclaredInEclipse + if [[ $? != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 Eclipse classpath. The patch causes the Eclipse classpath to differ from the contents of the lib directories." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 Eclipse classpath. The patch retains Eclipse classpath integrity." + return 0 +} +############################################################################### +### Run the test-contrib target +runContribTests () { + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Running contrib tests." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + + ### Kill any rogue build processes from the last attempt + $PS -auxwww | $GREP HadoopPatchProcess | /usr/bin/nawk '{print $2}' | /usr/bin/xargs -t -I {} /usr/bin/kill -9 {} > /dev/null + + echo "$ANT_HOME/bin/ant -Dversion="${VERSION}" $ECLIPSE_PROPERTY $PYTHON_PROPERTY -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=yes test-contrib" + $ANT_HOME/bin/ant -Dversion="${VERSION}" $ECLIPSE_PROPERTY $PYTHON_PROPERTY -DHadoopPatchProcess= -Dtest.junit.output.format=xml -Dtest.output=yes test-contrib + if [[ $? != 0 ]] ; then + JIRA_COMMENT="$JIRA_COMMENT + + -1 contrib tests. The patch failed contrib unit tests." + return 1 + fi + JIRA_COMMENT="$JIRA_COMMENT + + +1 contrib tests. The patch passed contrib unit tests." + return 0 +} + +############################################################################### +### Submit a comment to the defect's Jira +submitJiraComment () { + local result=$1 + ### Do not output the value of JIRA_COMMENT_FOOTER when run by a developer + if [[ $HUDSON == "false" ]] ; then + JIRA_COMMENT_FOOTER="" + fi + if [[ $result == 0 ]] ; then + comment="+1 overall. $JIRA_COMMENT + +$JIRA_COMMENT_FOOTER" + else + comment="-1 overall. $JIRA_COMMENT + +$JIRA_COMMENT_FOOTER" + fi + ### Output the test result to the console + echo " + + + +$comment" + + if [[ $HUDSON == "true" ]] ; then + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Adding comment to Jira." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + + ### Update Jira with a comment + export USER=hudson + $JIRACLI -s issues.apache.org/jira login hadoopqa $JIRA_PASSWD + $JIRACLI -s issues.apache.org/jira comment $defect "$comment" + $JIRACLI -s issues.apache.org/jira logout + fi +} + +############################################################################### +### Cleanup files +cleanupAndExit () { + local result=$1 + if [[ $HUDSON == "true" ]] ; then + if [ -e "$PATCH_DIR" ] ; then + mv $PATCH_DIR $BASEDIR + fi + CALLER=`hostname` + $CURL $PATCH_ADMIN_URL'&CALLER='$CALLER + fi + echo "" + echo "" + echo "======================================================================" + echo "======================================================================" + echo " Finished build." + echo "======================================================================" + echo "======================================================================" + echo "" + echo "" + exit $result +} + +############################################################################### +############################################################################### +############################################################################### + +JIRA_COMMENT="" +JIRA_COMMENT_FOOTER="Console output: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/console + +This message is automatically generated." + +### Check if arguments to the script have been specified properly or not +parseArgs $@ +cd $BASEDIR + +checkout +RESULT=$? +if [[ $HUDSON == "true" ]] ; then + if [[ $RESULT != 0 ]] ; then + ### Resubmit build. + $CURL $TRIGGER_BUILD_URL'&DEFECTNUM='$defect + exit 100 + fi +fi +setup +checkAuthor +RESULT=$? + +checkTests +(( RESULT = RESULT + $? )) +applyPatch +if [[ $? != 0 ]] ; then + submitJiraComment 1 + cleanupAndExit 1 +fi +checkJavadocWarnings +(( RESULT = RESULT + $? )) +checkJavacWarnings +(( RESULT = RESULT + $? )) +checkStyle +(( RESULT = RESULT + $? )) +checkFindbugsWarnings +(( RESULT = RESULT + $? )) +checkEclipse +(( RESULT = RESULT + $? )) +checkReleaseAuditWarnings +(( RESULT = RESULT + $? )) +### Do not call these when run by a developer +if [[ $HUDSON == "true" ]] ; then + runCoreTests + (( RESULT = RESULT + $? )) + runContribTests + (( RESULT = RESULT + $? )) +fi +JIRA_COMMENT_FOOTER="Test results: http://hudson.zones.apache.org/hudson/job/$JOB_NAME/$BUILD_NUMBER/testReport/ +$JIRA_COMMENT_FOOTER" + +submitJiraComment $RESULT +cleanupAndExit $RESULT diff --git a/src/test/checkstyle-noframes-sorted.xsl b/src/test/checkstyle-noframes-sorted.xsl new file mode 100644 index 0000000000..5f9e93ba27 --- /dev/null +++ b/src/test/checkstyle-noframes-sorted.xsl @@ -0,0 +1,178 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +

CheckStyle Audit

Designed for use with CheckStyle and Ant.
+
+ + + +
+ + + +
+ + + + +
+ + + + +
+ + + + +

Files

+ + + + + + + + + + + + + + +
NameErrors
+
+ + + + +

File

+ + + + + + + + + + + + + + +
Error DescriptionLine
+ Back to top +
+ + + +

Summary

+ + + + + + + + + + + + +
FilesErrors
+
+ + + + a + b + + +
+ + diff --git a/src/test/checkstyle.xml b/src/test/checkstyle.xml new file mode 100644 index 0000000000..5e3b89466c --- /dev/null +++ b/src/test/checkstyle.xml @@ -0,0 +1,170 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/core-site.xml b/src/test/core-site.xml new file mode 100644 index 0000000000..d33d4d2cff --- /dev/null +++ b/src/test/core-site.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + hadoop.tmp.dir + build/test + A base for other temporary directories. + true + + + + test.fs.s3.name + s3:/// + The name of the s3 file system for testing. + + + + fs.s3.block.size + 128 + Size of a block in bytes. + + + + fs.ftp.user.localhost + user + The username for connecting to FTP server running on localhost. + This is required by FTPFileSystem + + + + fs.ftp.password.localhost + password + The password for connecting to FTP server running on localhost. + This is required by FTPFileSystem + + + + test.fs.s3n.name + s3n:/// + The name of the s3n file system for testing. + + + diff --git a/src/test/ddl/buffer.jr b/src/test/ddl/buffer.jr new file mode 100644 index 0000000000..797aa67bb3 --- /dev/null +++ b/src/test/ddl/buffer.jr @@ -0,0 +1,6 @@ +module org.apache.hadoop.record { + class RecBuffer { + buffer data; + } +} + diff --git a/src/test/ddl/int.jr b/src/test/ddl/int.jr new file mode 100644 index 0000000000..61eae7f3b7 --- /dev/null +++ b/src/test/ddl/int.jr @@ -0,0 +1,6 @@ +module org.apache.hadoop.record { + class RecInt { + int data; + } +} + diff --git a/src/test/ddl/string.jr b/src/test/ddl/string.jr new file mode 100644 index 0000000000..cdd3e70a8a --- /dev/null +++ b/src/test/ddl/string.jr @@ -0,0 +1,6 @@ +module org.apache.hadoop.record { + class RecString { + ustring data; + } +} + diff --git a/src/test/ddl/test.jr b/src/test/ddl/test.jr new file mode 100644 index 0000000000..46c181d1c8 --- /dev/null +++ b/src/test/ddl/test.jr @@ -0,0 +1,46 @@ +module org.apache.hadoop.record { + class RecRecord0 { + ustring stringVal; + } + + class RecRecord1 { + boolean boolVal; + byte byteVal; + int intVal; + long longVal; + float floatVal; // testing inline comment + double doubleVal; /* testing comment */ + ustring stringVal; /* testing multi-line + * comment */ + buffer bufferVal; // testing another // inline comment + vector vectorVal; + map mapVal; + RecRecord0 recordVal; + } + + class RecRecordOld { + ustring name; + vector ivec; + vector> svec; + RecRecord0 inner; + vector>> strvec; + float i1; + map map1; + vector> mvec1; + vector> mvec2; + } + + /* RecRecordNew is a lot like RecRecordOld. Helps test for versioning. */ + class RecRecordNew { + ustring name2; + RecRecord0 inner; + vector ivec; + vector> svec; + vector>> strvec; + int i1; + map map1; + vector> mvec2; + } + +} + diff --git a/src/test/findbugsExcludeFile.xml b/src/test/findbugsExcludeFile.xml new file mode 100644 index 0000000000..4ff3500db6 --- /dev/null +++ b/src/test/findbugsExcludeFile.xml @@ -0,0 +1,216 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/hadoop-policy.xml b/src/test/hadoop-policy.xml new file mode 100644 index 0000000000..f57800f64d --- /dev/null +++ b/src/test/hadoop-policy.xml @@ -0,0 +1,97 @@ + + + + + + + + security.client.protocol.acl + * + ACL for ClientProtocol, which is used by user code + via the DistributedFileSystem. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.client.datanode.protocol.acl + * + ACL for ClientDatanodeProtocol, the client-to-datanode protocol + for block recovery. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.datanode.protocol.acl + * + ACL for DatanodeProtocol, which is used by datanodes to + communicate with the namenode. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.inter.datanode.protocol.acl + * + ACL for InterDatanodeProtocol, the inter-datanode protocol + for updating generation timestamp. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.namenode.protocol.acl + * + ACL for NamenodeProtocol, the protocol used by the secondary + namenode to communicate with the namenode. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.inter.tracker.protocol.acl + * + ACL for InterTrackerProtocol, used by the tasktrackers to + communicate with the jobtracker. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.job.submission.protocol.acl + * + ACL for JobSubmissionProtocol, used by job clients to + communciate with the jobtracker for job submission, querying job status etc. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.task.umbilical.protocol.acl + * + ACL for TaskUmbilicalProtocol, used by the map and reduce + tasks to communicate with the parent tasktracker. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + + security.refresh.policy.protocol.acl + ${user.name} + ACL for RefreshAuthorizationPolicyProtocol, used by the + dfsadmin and mradmin commands to refresh the security policy in-effect. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed. + + + diff --git a/src/test/hadoop-site.xml b/src/test/hadoop-site.xml new file mode 100644 index 0000000000..352c4ff01e --- /dev/null +++ b/src/test/hadoop-site.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/src/test/hdfs-site.xml b/src/test/hdfs-site.xml new file mode 100644 index 0000000000..cbd6ab6eff --- /dev/null +++ b/src/test/hdfs-site.xml @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java new file mode 100644 index 0000000000..8c00ac8b8c --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * Reducer that accumulates values based on their type. + *

+ * The type is specified in the key part of the key-value pair + * as a prefix to the key in the following way + *

+ * type:key + *

+ * The values are accumulated according to the types: + *

    + *
  • s: - string, concatenate
  • + *
  • f: - float, summ
  • + *
  • l: - long, summ
  • + *
+ * + */ +public class AccumulatingReducer extends MapReduceBase + implements Reducer { + static final String VALUE_TYPE_LONG = "l:"; + static final String VALUE_TYPE_FLOAT = "f:"; + static final String VALUE_TYPE_STRING = "s:"; + private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class); + + protected String hostName; + + public AccumulatingReducer () { + LOG.info("Starting AccumulatingReducer !!!"); + try { + hostName = java.net.InetAddress.getLocalHost().getHostName(); + } catch(Exception e) { + hostName = "localhost"; + } + LOG.info("Starting AccumulatingReducer on " + hostName); + } + + public void reduce(Text key, + Iterator values, + OutputCollector output, + Reporter reporter + ) throws IOException { + String field = key.toString(); + + reporter.setStatus("starting " + field + " ::host = " + hostName); + + // concatenate strings + if (field.startsWith(VALUE_TYPE_STRING)) { + String sSum = ""; + while (values.hasNext()) + sSum += values.next().toString() + ";"; + output.collect(key, new Text(sSum)); + reporter.setStatus("finished " + field + " ::host = " + hostName); + return; + } + // sum long values + if (field.startsWith(VALUE_TYPE_FLOAT)) { + float fSum = 0; + while (values.hasNext()) + fSum += Float.parseFloat(values.next().toString()); + output.collect(key, new Text(String.valueOf(fSum))); + reporter.setStatus("finished " + field + " ::host = " + hostName); + return; + } + // sum long values + if (field.startsWith(VALUE_TYPE_LONG)) { + long lSum = 0; + while (values.hasNext()) { + lSum += Long.parseLong(values.next().toString()); + } + output.collect(key, new Text(String.valueOf(lSum))); + } + reporter.setStatus("finished " + field + " ::host = " + hostName); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java new file mode 100644 index 0000000000..7bc3e6a343 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java @@ -0,0 +1,551 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.Date; +import java.util.StringTokenizer; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.*; + +/** + * Distributed i/o benchmark. + *

+ * This test writes into or reads from a specified number of files. + * File size is specified as a parameter to the test. + * Each file is accessed in a separate map task. + *

+ * The reducer collects the following statistics: + *

    + *
  • number of tasks completed
  • + *
  • number of bytes written/read
  • + *
  • execution time
  • + *
  • io rate
  • + *
  • io rate squared
  • + *
+ * + * Finally, the following information is appended to a local file + *
    + *
  • read or write test
  • + *
  • date and time the test finished
  • + *
  • number of files
  • + *
  • total number of bytes processed
  • + *
  • throughput in mb/sec (total number of bytes / sum of processing times)
  • + *
  • average i/o rate in mb/sec per file
  • + *
  • standard i/o rate deviation
  • + *
+ */ +public class DFSCIOTest extends TestCase { + // Constants + private static final Log LOG = LogFactory.getLog(DFSCIOTest.class); + private static final int TEST_TYPE_READ = 0; + private static final int TEST_TYPE_WRITE = 1; + private static final int TEST_TYPE_CLEANUP = 2; + private static final int DEFAULT_BUFFER_SIZE = 1000000; + private static final String BASE_FILE_NAME = "test_io_"; + private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log"; + + private static Configuration fsConfig = new Configuration(); + private static final long MEGA = 0x100000; + private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest"); + private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); + private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); + private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); + private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); + + private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest"); + private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1"); + private static String CHMOD = new String("chmod"); + private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION); + private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read"); + private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write"); + + /** + * Run the test with default parameters. + * + * @throws Exception + */ + public void testIOs() throws Exception { + testIOs(10, 10); + } + + /** + * Run the test with the specified parameters. + * + * @param fileSize file size + * @param nrFiles number of files + * @throws IOException + */ + public static void testIOs(int fileSize, int nrFiles) + throws IOException { + + FileSystem fs = FileSystem.get(fsConfig); + + createControlFile(fs, fileSize, nrFiles); + writeTest(fs); + readTest(fs); + } + + private static void createControlFile( + FileSystem fs, + int fileSize, // in MB + int nrFiles + ) throws IOException { + LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); + + fs.delete(CONTROL_DIR, true); + + for(int i=0; i < nrFiles; i++) { + String name = getFileName(i); + Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(fs, fsConfig, controlFile, + Text.class, LongWritable.class, + CompressionType.NONE); + writer.append(new Text(name), new LongWritable(fileSize)); + } catch(Exception e) { + throw new IOException(e.getLocalizedMessage()); + } finally { + if (writer != null) + writer.close(); + writer = null; + } + } + LOG.info("created control files for: "+nrFiles+" files"); + } + + private static String getFileName(int fIdx) { + return BASE_FILE_NAME + Integer.toString(fIdx); + } + + /** + * Write/Read mapper base class. + *

+ * Collects the following statistics per task: + *

    + *
  • number of tasks completed
  • + *
  • number of bytes written/read
  • + *
  • execution time
  • + *
  • i/o rate
  • + *
  • i/o rate squared
  • + *
+ */ + private abstract static class IOStatMapper extends IOMapperBase { + IOStatMapper() { + super(fsConfig); + } + + void collectStats(OutputCollector output, + String name, + long execTime, + Object objSize) throws IOException { + long totalSize = ((Long)objSize).longValue(); + float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); + LOG.info("Number of bytes processed = " + totalSize); + LOG.info("Exec time = " + execTime); + LOG.info("IO rate = " + ioRateMbSec); + + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), + new Text(String.valueOf(1))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), + new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); + } + } + + /** + * Write mapper class. + */ + public static class WriteMapper extends IOStatMapper { + + public WriteMapper() { + super(); + for(int i=0; i < bufferSize; i++) + buffer[i] = (byte)('0' + i % 50); + } + + public Object doIO(Reporter reporter, + String name, + long totalSize + ) throws IOException { + // create file + totalSize *= MEGA; + + // create instance of local filesystem + FileSystem localFS = FileSystem.getLocal(fsConfig); + + try { + // native runtime + Runtime runTime = Runtime.getRuntime(); + + // copy the dso and executable from dfs and chmod them + synchronized (this) { + localFS.delete(HDFS_TEST_DIR, true); + if (!(localFS.mkdirs(HDFS_TEST_DIR))) { + throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_SHLIB)) { + FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig); + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_WRITE)) { + FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig); + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + // exec the C program + Path outFile = new Path(DATA_DIR, name); + String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); + Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString())); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus); + } + } catch (InterruptedException interruptedException) { + reporter.setStatus(interruptedException.toString()); + } finally { + localFS.close(); + } + return new Long(totalSize); + } + } + + private static void writeTest(FileSystem fs) + throws IOException { + + fs.delete(DATA_DIR, true); + fs.delete(WRITE_DIR, true); + + runIOTest(WriteMapper.class, WRITE_DIR); + } + + private static void runIOTest( Class mapperClass, + Path outputDir + ) throws IOException { + JobConf job = new JobConf(fsConfig, DFSCIOTest.class); + + FileInputFormat.setInputPaths(job, CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(mapperClass); + job.setReducerClass(AccumulatingReducer.class); + + FileOutputFormat.setOutputPath(job, outputDir); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + /** + * Read mapper class. + */ + public static class ReadMapper extends IOStatMapper { + + public ReadMapper() { + super(); + } + + public Object doIO(Reporter reporter, + String name, + long totalSize + ) throws IOException { + totalSize *= MEGA; + + // create instance of local filesystem + FileSystem localFS = FileSystem.getLocal(fsConfig); + + try { + // native runtime + Runtime runTime = Runtime.getRuntime(); + + // copy the dso and executable from dfs + synchronized (this) { + localFS.delete(HDFS_TEST_DIR, true); + if (!(localFS.mkdirs(HDFS_TEST_DIR))) { + throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_SHLIB)) { + if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { + throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); + } + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_READ)) { + if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { + throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); + } + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + // exec the C program + Path inFile = new Path(DATA_DIR, name); + String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + + bufferSize); + Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString())); + int exitStatus = process.waitFor(); + + if (exitStatus != 0) { + throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); + } + } catch (InterruptedException interruptedException) { + reporter.setStatus(interruptedException.toString()); + } finally { + localFS.close(); + } + return new Long(totalSize); + } + } + + private static void readTest(FileSystem fs) throws IOException { + fs.delete(READ_DIR, true); + runIOTest(ReadMapper.class, READ_DIR); + } + + private static void sequentialTest( + FileSystem fs, + int testType, + int fileSize, + int nrFiles + ) throws Exception { + IOStatMapper ioer = null; + if (testType == TEST_TYPE_READ) + ioer = new ReadMapper(); + else if (testType == TEST_TYPE_WRITE) + ioer = new WriteMapper(); + else + return; + for(int i=0; i < nrFiles; i++) + ioer.doIO(Reporter.NULL, + BASE_FILE_NAME+Integer.toString(i), + MEGA*fileSize); + } + + public static void main(String[] args) { + int testType = TEST_TYPE_READ; + int bufferSize = DEFAULT_BUFFER_SIZE; + int fileSize = 1; + int nrFiles = 1; + String resFileName = DEFAULT_RES_FILE_NAME; + boolean isSequential = false; + + String version="DFSCIOTest.0.0.1"; + String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; + + System.out.println(version); + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].startsWith("-r")) { + testType = TEST_TYPE_READ; + } else if (args[i].startsWith("-w")) { + testType = TEST_TYPE_WRITE; + } else if (args[i].startsWith("-clean")) { + testType = TEST_TYPE_CLEANUP; + } else if (args[i].startsWith("-seq")) { + isSequential = true; + } else if (args[i].equals("-nrFiles")) { + nrFiles = Integer.parseInt(args[++i]); + } else if (args[i].equals("-fileSize")) { + fileSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bufferSize")) { + bufferSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-resFile")) { + resFileName = args[++i]; + } + } + + LOG.info("nrFiles = " + nrFiles); + LOG.info("fileSize (MB) = " + fileSize); + LOG.info("bufferSize = " + bufferSize); + + try { + fsConfig.setInt("test.io.file.buffer.size", bufferSize); + FileSystem fs = FileSystem.get(fsConfig); + + if (testType != TEST_TYPE_CLEANUP) { + fs.delete(HDFS_TEST_DIR, true); + if (!fs.mkdirs(HDFS_TEST_DIR)) { + throw new IOException("Mkdirs failed to create " + + HDFS_TEST_DIR.toString()); + } + + //Copy the executables over to the remote filesystem + String hadoopHome = System.getenv("HADOOP_HOME"); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), + HDFS_SHLIB); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); + } + + if (isSequential) { + long tStart = System.currentTimeMillis(); + sequentialTest(fs, testType, fileSize, nrFiles); + long execTime = System.currentTimeMillis() - tStart; + String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; + LOG.info(resultLine); + return; + } + if (testType == TEST_TYPE_CLEANUP) { + cleanup(fs); + return; + } + createControlFile(fs, fileSize, nrFiles); + long tStart = System.currentTimeMillis(); + if (testType == TEST_TYPE_WRITE) + writeTest(fs); + if (testType == TEST_TYPE_READ) + readTest(fs); + long execTime = System.currentTimeMillis() - tStart; + + analyzeResult(fs, testType, execTime, resFileName); + } catch(Exception e) { + System.err.print(e.getLocalizedMessage()); + System.exit(-1); + } + } + + private static void analyzeResult( FileSystem fs, + int testType, + long execTime, + String resFileName + ) throws IOException { + Path reduceFile; + if (testType == TEST_TYPE_WRITE) + reduceFile = new Path(WRITE_DIR, "part-00000"); + else + reduceFile = new Path(READ_DIR, "part-00000"); + DataInputStream in; + in = new DataInputStream(fs.open(reduceFile)); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + long tasks = 0; + long size = 0; + long time = 0; + float rate = 0; + float sqrate = 0; + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if (attr.endsWith(":tasks")) + tasks = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":size")) + size = Long.parseLong(tokens. nextToken()); + else if (attr.endsWith(":time")) + time = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":rate")) + rate = Float.parseFloat(tokens.nextToken()); + else if (attr.endsWith(":sqrate")) + sqrate = Float.parseFloat(tokens.nextToken()); + } + + double med = rate / 1000 / tasks; + double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); + String resultLines[] = { + "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : + (testType == TEST_TYPE_READ) ? "read" : + "unknown"), + " Date & time: " + new Date(System.currentTimeMillis()), + " Number of files: " + tasks, + "Total MBytes processed: " + size/MEGA, + " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), + "Average IO rate mb/sec: " + med, + " Std IO rate deviation: " + stdDev, + " Test exec time sec: " + (float)execTime / 1000, + "" }; + + PrintStream res = new PrintStream( + new FileOutputStream( + new File(resFileName), true)); + for(int i = 0; i < resultLines.length; i++) { + LOG.info(resultLines[i]); + res.println(resultLines[i]); + } + } + + private static void cleanup(FileSystem fs) throws Exception { + LOG.info("Cleaning up test files"); + fs.delete(new Path(TEST_ROOT_DIR), true); + fs.delete(HDFS_TEST_DIR, true); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java new file mode 100644 index 0000000000..21d9cdfb93 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.Date; +import java.util.StringTokenizer; +import java.util.TreeSet; +import java.util.Vector; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.*; + +/** + * Distributed checkup of the file system consistency. + *

+ * Test file system consistency by reading each block of each file + * of the specified file tree. + * Report corrupted blocks and general file statistics. + *

+ * Optionally displays statistics on read performance. + * + */ +public class DistributedFSCheck extends TestCase { + // Constants + private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class); + private static final int TEST_TYPE_READ = 0; + private static final int TEST_TYPE_CLEANUP = 2; + private static final int DEFAULT_BUFFER_SIZE = 1000000; + private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log"; + private static final long MEGA = 0x100000; + + private static Configuration fsConfig = new Configuration(); + private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck")); + private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input"); + private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); + + private FileSystem fs; + private long nrFiles; + + DistributedFSCheck(Configuration conf) throws Exception { + fsConfig = conf; + this.fs = FileSystem.get(conf); + } + + /** + * Run distributed checkup for the entire files system. + * + * @throws Exception + */ + public void testFSBlocks() throws Exception { + testFSBlocks("/"); + } + + /** + * Run distributed checkup for the specified directory. + * + * @param rootName root directory name + * @throws Exception + */ + public void testFSBlocks(String rootName) throws Exception { + createInputFile(rootName); + runDistributedFSCheck(); + cleanup(); // clean up after all to restore the system state + } + + private void createInputFile(String rootName) throws IOException { + cleanup(); // clean up if previous run failed + + Path inputFile = new Path(MAP_INPUT_DIR, "in_file"); + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, fsConfig, inputFile, + Text.class, LongWritable.class, CompressionType.NONE); + + try { + nrFiles = 0; + listSubtree(new Path(rootName), writer); + } finally { + writer.close(); + } + LOG.info("Created map input files."); + } + + private void listSubtree(Path rootFile, + SequenceFile.Writer writer + ) throws IOException { + FileStatus rootStatus = fs.getFileStatus(rootFile); + listSubtree(rootStatus, writer); + } + + private void listSubtree(FileStatus rootStatus, + SequenceFile.Writer writer + ) throws IOException { + Path rootFile = rootStatus.getPath(); + if (!rootStatus.isDir()) { + nrFiles++; + // For a regular file generate pairs + long blockSize = fs.getDefaultBlockSize(); + long fileLength = rootStatus.getLen(); + for(long offset = 0; offset < fileLength; offset += blockSize) + writer.append(new Text(rootFile.toString()), new LongWritable(offset)); + return; + } + + FileStatus children[] = fs.listStatus(rootFile); + if (children == null) + throw new IOException("Could not get listing for " + rootFile); + for (int i = 0; i < children.length; i++) + listSubtree(children[i], writer); + } + + /** + * DistributedFSCheck mapper class. + */ + public static class DistributedFSCheckMapper extends IOMapperBase { + + public DistributedFSCheckMapper() { + super(fsConfig); + } + + public Object doIO(Reporter reporter, + String name, + long offset + ) throws IOException { + // open file + FSDataInputStream in = null; + try { + in = fs.open(new Path(name)); + } catch(IOException e) { + return name + "@(missing)"; + } + in.seek(offset); + long actualSize = 0; + try { + long blockSize = fs.getDefaultBlockSize(); + reporter.setStatus("reading " + name + "@" + + offset + "/" + blockSize); + for( int curSize = bufferSize; + curSize == bufferSize && actualSize < blockSize; + actualSize += curSize) { + curSize = in.read(buffer, 0, bufferSize); + } + } catch(IOException e) { + LOG.info("Corrupted block detected in \"" + name + "\" at " + offset); + return name + "@" + offset; + } finally { + in.close(); + } + return new Long(actualSize); + } + + void collectStats(OutputCollector output, + String name, + long execTime, + Object corruptedBlock) throws IOException { + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"), + new Text(String.valueOf(1))); + + if (corruptedBlock.getClass().getName().endsWith("String")) { + output.collect( + new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"), + new Text((String)corruptedBlock)); + return; + } + long totalSize = ((Long)corruptedBlock).longValue(); + float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000); + LOG.info("Number of bytes processed = " + totalSize); + LOG.info("Exec time = " + execTime); + LOG.info("IO rate = " + ioRateMbSec); + + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); + } + } + + private void runDistributedFSCheck() throws Exception { + JobConf job = new JobConf(fs.getConf(), DistributedFSCheck.class); + + FileInputFormat.setInputPaths(job, MAP_INPUT_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(DistributedFSCheckMapper.class); + job.setReducerClass(AccumulatingReducer.class); + + FileOutputFormat.setOutputPath(job, READ_DIR); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + public static void main(String[] args) throws Exception { + int testType = TEST_TYPE_READ; + int bufferSize = DEFAULT_BUFFER_SIZE; + String resFileName = DEFAULT_RES_FILE_NAME; + String rootName = "/"; + boolean viewStats = false; + + String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] "; + + if (args.length == 1 && args[0].startsWith("-h")) { + System.err.println(usage); + System.exit(-1); + } + for(int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-root")) { + rootName = args[++i]; + } else if (args[i].startsWith("-clean")) { + testType = TEST_TYPE_CLEANUP; + } else if (args[i].equals("-bufferSize")) { + bufferSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-resFile")) { + resFileName = args[++i]; + } else if (args[i].startsWith("-stat")) { + viewStats = true; + } + } + + LOG.info("root = " + rootName); + LOG.info("bufferSize = " + bufferSize); + + Configuration conf = new Configuration(); + conf.setInt("test.io.file.buffer.size", bufferSize); + DistributedFSCheck test = new DistributedFSCheck(conf); + + if (testType == TEST_TYPE_CLEANUP) { + test.cleanup(); + return; + } + test.createInputFile(rootName); + long tStart = System.currentTimeMillis(); + test.runDistributedFSCheck(); + long execTime = System.currentTimeMillis() - tStart; + + test.analyzeResult(execTime, resFileName, viewStats); + // test.cleanup(); // clean up after all to restore the system state + } + + private void analyzeResult(long execTime, + String resFileName, + boolean viewStats + ) throws IOException { + Path reduceFile= new Path(READ_DIR, "part-00000"); + DataInputStream in; + in = new DataInputStream(fs.open(reduceFile)); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + long blocks = 0; + long size = 0; + long time = 0; + float rate = 0; + StringTokenizer badBlocks = null; + long nrBadBlocks = 0; + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if (attr.endsWith("blocks")) + blocks = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith("size")) + size = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith("time")) + time = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith("rate")) + rate = Float.parseFloat(tokens.nextToken()); + else if (attr.endsWith("badBlocks")) { + badBlocks = new StringTokenizer(tokens.nextToken(), ";"); + nrBadBlocks = badBlocks.countTokens(); + } + } + + Vector resultLines = new Vector(); + resultLines.add( "----- DistributedFSCheck ----- : "); + resultLines.add( " Date & time: " + new Date(System.currentTimeMillis())); + resultLines.add( " Total number of blocks: " + blocks); + resultLines.add( " Total number of files: " + nrFiles); + resultLines.add( "Number of corrupted blocks: " + nrBadBlocks); + + int nrBadFilesPos = resultLines.size(); + TreeSet badFiles = new TreeSet(); + long nrBadFiles = 0; + if (nrBadBlocks > 0) { + resultLines.add(""); + resultLines.add("----- Corrupted Blocks (file@offset) ----- : "); + while(badBlocks.hasMoreTokens()) { + String curBlock = badBlocks.nextToken(); + resultLines.add(curBlock); + badFiles.add(curBlock.substring(0, curBlock.indexOf('@'))); + } + nrBadFiles = badFiles.size(); + } + + resultLines.insertElementAt(" Number of corrupted files: " + nrBadFiles, nrBadFilesPos); + + if (viewStats) { + resultLines.add(""); + resultLines.add("----- Performance ----- : "); + resultLines.add(" Total MBytes read: " + size/MEGA); + resultLines.add(" Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA)); + resultLines.add(" Average IO rate mb/sec: " + rate / 1000 / blocks); + resultLines.add(" Test exec time sec: " + (float)execTime / 1000); + } + + PrintStream res = new PrintStream( + new FileOutputStream( + new File(resFileName), true)); + for(int i = 0; i < resultLines.size(); i++) { + String cur = resultLines.get(i); + LOG.info(cur); + res.println(cur); + } + } + + private void cleanup() throws IOException { + LOG.info("Cleaning up test files"); + fs.delete(TEST_ROOT_DIR, true); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java new file mode 100644 index 0000000000..672bf89594 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.net.InetAddress; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** + * Base mapper class for IO operations. + *

+ * Two abstract method {@link #doIO(Reporter, String, long)} and + * {@link #collectStats(OutputCollector,String,long,Object)} should be + * overloaded in derived classes to define the IO operation and the + * statistics data to be collected by subsequent reducers. + * + */ +public abstract class IOMapperBase extends Configured + implements Mapper { + + protected byte[] buffer; + protected int bufferSize; + protected FileSystem fs; + protected String hostName; + + public IOMapperBase(Configuration conf) { + super(conf); + try { + fs = FileSystem.get(conf); + } catch (Exception e) { + throw new RuntimeException("Cannot create file system.", e); + } + bufferSize = conf.getInt("test.io.file.buffer.size", 4096); + buffer = new byte[bufferSize]; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch(Exception e) { + hostName = "localhost"; + } + } + + public void configure(JobConf job) { + setConf(job); + } + + public void close() throws IOException { + } + + /** + * Perform io operation, usually read or write. + * + * @param reporter + * @param name file name + * @param value offset within the file + * @return object that is passed as a parameter to + * {@link #collectStats(OutputCollector,String,long,Object)} + * @throws IOException + */ + abstract Object doIO(Reporter reporter, + String name, + long value) throws IOException; + + /** + * Collect stat data to be combined by a subsequent reducer. + * + * @param output + * @param name file name + * @param execTime IO execution time + * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)} + * @throws IOException + */ + abstract void collectStats(OutputCollector output, + String name, + long execTime, + Object doIOReturnValue) throws IOException; + + /** + * Map file name and offset into statistical data. + *

+ * The map task is to get the + * key, which contains the file name, and the + * value, which is the offset within the file. + * + * The parameters are passed to the abstract method + * {@link #doIO(Reporter,String,long)}, which performs the io operation, + * usually read or write data, and then + * {@link #collectStats(OutputCollector,String,long,Object)} + * is called to prepare stat data for a subsequent reducer. + */ + public void map(Text key, + LongWritable value, + OutputCollector output, + Reporter reporter) throws IOException { + String name = key.toString(); + long longValue = value.get(); + + reporter.setStatus("starting " + name + " ::host = " + hostName); + + long tStart = System.currentTimeMillis(); + Object statValue = doIO(reporter, name, longValue); + long tEnd = System.currentTimeMillis(); + long execTime = tEnd - tStart; + collectStats(output, name, execTime, statValue); + + reporter.setStatus("finished " + name + " ::host = " + hostName); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java new file mode 100644 index 0000000000..a64d24dcaf --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java @@ -0,0 +1,853 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.StringTokenizer; + +import junit.framework.TestCase; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; + + +/** + * A JUnit test for copying files recursively. + */ +public class TestCopyFiles extends TestCase { + { + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange") + ).getLogger().setLevel(Level.OFF); + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF); + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF); + ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL); + } + + static final URI LOCAL_FS = URI.create("file:///"); + + private static final Random RAN = new Random(); + private static final int NFILES = 20; + private static String TEST_ROOT_DIR = + new Path(System.getProperty("test.build.data","/tmp")) + .toString().replace(' ', '+'); + + /** class MyFile contains enough information to recreate the contents of + * a single file. + */ + private static class MyFile { + private static Random gen = new Random(); + private static final int MAX_LEVELS = 3; + private static final int MAX_SIZE = 8*1024; + private static String[] dirNames = { + "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" + }; + private final String name; + private int size = 0; + private long seed = 0L; + + MyFile() { + this(gen.nextInt(MAX_LEVELS)); + } + MyFile(int nLevels) { + String xname = ""; + if (nLevels != 0) { + int[] levels = new int[nLevels]; + for (int idx = 0; idx < nLevels; idx++) { + levels[idx] = gen.nextInt(10); + } + StringBuffer sb = new StringBuffer(); + for (int idx = 0; idx < nLevels; idx++) { + sb.append(dirNames[levels[idx]]); + sb.append("/"); + } + xname = sb.toString(); + } + long fidx = gen.nextLong() & Long.MAX_VALUE; + name = xname + Long.toString(fidx); + reset(); + } + void reset() { + final int oldsize = size; + do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size); + final long oldseed = seed; + do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed); + } + String getName() { return name; } + int getSize() { return size; } + long getSeed() { return seed; } + } + + private static MyFile[] createFiles(URI fsname, String topdir) + throws IOException { + return createFiles(FileSystem.get(fsname, new Configuration()), topdir); + } + + /** create NFILES with random names and directory hierarchies + * with random (but reproducible) data in them. + */ + private static MyFile[] createFiles(FileSystem fs, String topdir) + throws IOException { + Path root = new Path(topdir); + MyFile[] files = new MyFile[NFILES]; + for (int i = 0; i < NFILES; i++) { + files[i] = createFile(root, fs); + } + return files; + } + + static MyFile createFile(Path root, FileSystem fs, int levels) + throws IOException { + MyFile f = levels < 0 ? new MyFile() : new MyFile(levels); + Path p = new Path(root, f.getName()); + FSDataOutputStream out = fs.create(p); + byte[] toWrite = new byte[f.getSize()]; + new Random(f.getSeed()).nextBytes(toWrite); + out.write(toWrite); + out.close(); + FileSystem.LOG.info("created: " + p + ", size=" + f.getSize()); + return f; + } + + static MyFile createFile(Path root, FileSystem fs) throws IOException { + return createFile(root, fs, -1); + } + + private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files + ) throws IOException { + return checkFiles(fs, topdir, files, false); + } + + private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files, + boolean existingOnly) throws IOException { + Path root = new Path(topdir); + + for (int idx = 0; idx < files.length; idx++) { + Path fPath = new Path(root, files[idx].getName()); + try { + fs.getFileStatus(fPath); + FSDataInputStream in = fs.open(fPath); + byte[] toRead = new byte[files[idx].getSize()]; + byte[] toCompare = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toCompare); + assertEquals("Cannnot read file.", toRead.length, in.read(toRead)); + in.close(); + for (int i = 0; i < toRead.length; i++) { + if (toRead[i] != toCompare[i]) { + return false; + } + } + toRead = null; + toCompare = null; + } + catch(FileNotFoundException fnfe) { + if (!existingOnly) { + throw fnfe; + } + } + } + + return true; + } + + private static void updateFiles(FileSystem fs, String topdir, MyFile[] files, + int nupdate) throws IOException { + assert nupdate <= NFILES; + + Path root = new Path(topdir); + + for (int idx = 0; idx < nupdate; ++idx) { + Path fPath = new Path(root, files[idx].getName()); + // overwrite file + assertTrue(fPath.toString() + " does not exist", fs.exists(fPath)); + FSDataOutputStream out = fs.create(fPath); + files[idx].reset(); + byte[] toWrite = new byte[files[idx].getSize()]; + Random rb = new Random(files[idx].getSeed()); + rb.nextBytes(toWrite); + out.write(toWrite); + out.close(); + } + } + + private static FileStatus[] getFileStatus(FileSystem fs, + String topdir, MyFile[] files) throws IOException { + return getFileStatus(fs, topdir, files, false); + } + private static FileStatus[] getFileStatus(FileSystem fs, + String topdir, MyFile[] files, boolean existingOnly) throws IOException { + Path root = new Path(topdir); + List statuses = new ArrayList(); + for (int idx = 0; idx < NFILES; ++idx) { + try { + statuses.add(fs.getFileStatus(new Path(root, files[idx].getName()))); + } catch(FileNotFoundException fnfe) { + if (!existingOnly) { + throw fnfe; + } + } + } + return statuses.toArray(new FileStatus[statuses.size()]); + } + + private static boolean checkUpdate(FileSystem fs, FileStatus[] old, + String topdir, MyFile[] upd, final int nupdate) throws IOException { + Path root = new Path(topdir); + + // overwrote updated files + for (int idx = 0; idx < nupdate; ++idx) { + final FileStatus stat = + fs.getFileStatus(new Path(root, upd[idx].getName())); + if (stat.getModificationTime() <= old[idx].getModificationTime()) { + return false; + } + } + // did not overwrite files not updated + for (int idx = nupdate; idx < NFILES; ++idx) { + final FileStatus stat = + fs.getFileStatus(new Path(root, upd[idx].getName())); + if (stat.getModificationTime() != old[idx].getModificationTime()) { + return false; + } + } + return true; + } + + /** delete directory and everything underneath it.*/ + private static void deldir(FileSystem fs, String topdir) throws IOException { + fs.delete(new Path(topdir), true); + } + + /** copy files from local file system to local file system */ + public void testCopyFromLocalToLocal() throws Exception { + Configuration conf = new Configuration(); + FileSystem localfs = FileSystem.get(LOCAL_FS, conf); + MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); + deldir(localfs, TEST_ROOT_DIR+"/destdat"); + deldir(localfs, TEST_ROOT_DIR+"/srcdat"); + } + + /** copy files from dfs file system to dfs file system */ + public void testCopyFromDfsToDfs() throws Exception { + String namenode = null; + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(conf, 2, true, null); + final FileSystem hdfs = cluster.getFileSystem(); + namenode = FileSystem.getDefaultUri(conf).toString(); + if (namenode.startsWith("hdfs://")) { + MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); + ToolRunner.run(new DistCp(conf), new String[] { + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(hdfs, "/destdat", files)); + FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); + assertTrue("Log directory does not exist.", + fs.exists(new Path(namenode+"/logs"))); + deldir(hdfs, "/destdat"); + deldir(hdfs, "/srcdat"); + deldir(hdfs, "/logs"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + /** copy files from local file system to dfs file system */ + public void testCopyFromLocalToDfs() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(conf, 1, true, null); + final FileSystem hdfs = cluster.getFileSystem(); + final String namenode = hdfs.getUri().toString(); + if (namenode.startsWith("hdfs://")) { + MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); + ToolRunner.run(new DistCp(conf), new String[] { + "-log", + namenode+"/logs", + "file:///"+TEST_ROOT_DIR+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(cluster.getFileSystem(), "/destdat", files)); + assertTrue("Log directory does not exist.", + hdfs.exists(new Path(namenode+"/logs"))); + deldir(hdfs, "/destdat"); + deldir(hdfs, "/logs"); + deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + /** copy files from dfs file system to local file system */ + public void testCopyFromDfsToLocal() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); + cluster = new MiniDFSCluster(conf, 1, true, null); + final FileSystem hdfs = cluster.getFileSystem(); + final String namenode = FileSystem.getDefaultUri(conf).toString(); + if (namenode.startsWith("hdfs://")) { + MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); + ToolRunner.run(new DistCp(conf), new String[] { + "-log", + "/logs", + namenode+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); + assertTrue("Log directory does not exist.", + hdfs.exists(new Path("/logs"))); + deldir(localfs, TEST_ROOT_DIR+"/destdat"); + deldir(hdfs, "/logs"); + deldir(hdfs, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + public void testCopyDfsToDfsUpdateOverwrite() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(conf, 2, true, null); + final FileSystem hdfs = cluster.getFileSystem(); + final String namenode = hdfs.getUri().toString(); + if (namenode.startsWith("hdfs://")) { + MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); + ToolRunner.run(new DistCp(conf), new String[] { + "-p", + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(hdfs, "/destdat", files)); + FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); + assertTrue("Log directory does not exist.", + fs.exists(new Path(namenode+"/logs"))); + + FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files); + final int nupdate = NFILES>>2; + updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate); + deldir(hdfs, "/logs"); + + ToolRunner.run(new DistCp(conf), new String[] { + "-p", + "-update", + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(hdfs, "/destdat", files)); + assertTrue("Update failed to replicate all changes in src", + checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate)); + + deldir(hdfs, "/logs"); + ToolRunner.run(new DistCp(conf), new String[] { + "-p", + "-overwrite", + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(hdfs, "/destdat", files)); + assertTrue("-overwrite didn't.", + checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES)); + + deldir(hdfs, "/destdat"); + deldir(hdfs, "/srcdat"); + deldir(hdfs, "/logs"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + public void testCopyDuplication() throws Exception { + final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration()); + try { + MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat"); + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/src2/srcdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files)); + + assertEquals(DistCp.DuplicationException.ERROR_CODE, + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/src2/srcdat", + "file:///"+TEST_ROOT_DIR+"/destdat",})); + } + finally { + deldir(localfs, TEST_ROOT_DIR+"/destdat"); + deldir(localfs, TEST_ROOT_DIR+"/srcdat"); + deldir(localfs, TEST_ROOT_DIR+"/src2"); + } + } + + public void testCopySingleFile() throws Exception { + FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration()); + Path root = new Path(TEST_ROOT_DIR+"/srcdat"); + try { + MyFile[] files = {createFile(root, fs)}; + //copy a dir with a single file + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, TEST_ROOT_DIR+"/destdat", files)); + + //copy a single file + String fname = files[0].getName(); + Path p = new Path(root, fname); + FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p)); + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname, + "file:///"+TEST_ROOT_DIR+"/dest2/"+fname}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, TEST_ROOT_DIR+"/dest2", files)); + //copy single file to existing dir + deldir(fs, TEST_ROOT_DIR+"/dest2"); + fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2")); + MyFile[] files2 = {createFile(root, fs, 0)}; + String sname = files2[0].getName(); + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"-update", + "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, + "file:///"+TEST_ROOT_DIR+"/dest2/"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); + updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1); + //copy single file to existing dir w/ dst name conflict + ToolRunner.run(new DistCp(new Configuration()), + new String[] {"-update", + "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, + "file:///"+TEST_ROOT_DIR+"/dest2/"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); + } + finally { + deldir(fs, TEST_ROOT_DIR+"/destdat"); + deldir(fs, TEST_ROOT_DIR+"/dest2"); + deldir(fs, TEST_ROOT_DIR+"/srcdat"); + } + } + + public void testPreserveOption() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(conf, 2, true, null); + String nnUri = FileSystem.getDefaultUri(conf).toString(); + FileSystem fs = FileSystem.get(URI.create(nnUri), conf); + + {//test preserving user + MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); + FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); + for(int i = 0; i < srcstat.length; i++) { + fs.setOwner(srcstat[i].getPath(), "u" + i, null); + } + ToolRunner.run(new DistCp(conf), + new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, "/destdat", files)); + + FileStatus[] dststat = getFileStatus(fs, "/destdat", files); + for(int i = 0; i < dststat.length; i++) { + assertEquals("i=" + i, "u" + i, dststat[i].getOwner()); + } + deldir(fs, "/destdat"); + deldir(fs, "/srcdat"); + } + + {//test preserving group + MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); + FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); + for(int i = 0; i < srcstat.length; i++) { + fs.setOwner(srcstat[i].getPath(), null, "g" + i); + } + ToolRunner.run(new DistCp(conf), + new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, "/destdat", files)); + + FileStatus[] dststat = getFileStatus(fs, "/destdat", files); + for(int i = 0; i < dststat.length; i++) { + assertEquals("i=" + i, "g" + i, dststat[i].getGroup()); + } + deldir(fs, "/destdat"); + deldir(fs, "/srcdat"); + } + + {//test preserving mode + MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); + FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); + FsPermission[] permissions = new FsPermission[srcstat.length]; + for(int i = 0; i < srcstat.length; i++) { + permissions[i] = new FsPermission((short)(i & 0666)); + fs.setPermission(srcstat[i].getPath(), permissions[i]); + } + + ToolRunner.run(new DistCp(conf), + new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, "/destdat", files)); + + FileStatus[] dststat = getFileStatus(fs, "/destdat", files); + for(int i = 0; i < dststat.length; i++) { + assertEquals("i=" + i, permissions[i], dststat[i].getPermission()); + } + deldir(fs, "/destdat"); + deldir(fs, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + public void testMapCount() throws Exception { + String namenode = null; + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 3, true, null); + FileSystem fs = dfs.getFileSystem(); + final FsShell shell = new FsShell(conf); + namenode = fs.getUri().toString(); + mr = new MiniMRCluster(3, namenode, 1); + MyFile[] files = createFiles(fs.getUri(), "/srcdat"); + long totsize = 0; + for (MyFile f : files) { + totsize += f.getSize(); + } + Configuration job = mr.createJobConf(); + job.setLong("distcp.bytes.per.map", totsize / 3); + ToolRunner.run(new DistCp(job), + new String[] {"-m", "100", + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(fs, "/destdat", files)); + + String logdir = namenode + "/logs"; + System.out.println(execCmd(shell, "-lsr", logdir)); + FileStatus[] logs = fs.listStatus(new Path(logdir)); + // rare case where splits are exact, logs.length can be 4 + assertTrue("Unexpected map count, logs.length=" + logs.length, + logs.length == 5 || logs.length == 4); + + deldir(fs, "/destdat"); + deldir(fs, "/logs"); + ToolRunner.run(new DistCp(job), + new String[] {"-m", "1", + "-log", + namenode+"/logs", + namenode+"/srcdat", + namenode+"/destdat"}); + + System.out.println(execCmd(shell, "-lsr", logdir)); + logs = fs.listStatus(new Path(namenode+"/logs")); + assertTrue("Unexpected map count, logs.length=" + logs.length, + logs.length == 2); + } finally { + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown(); } + } + } + + public void testLimits() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(conf, 2, true, null); + final String nnUri = FileSystem.getDefaultUri(conf).toString(); + final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); + final DistCp distcp = new DistCp(conf); + final FsShell shell = new FsShell(conf); + + final String srcrootdir = "/src_root"; + final Path srcrootpath = new Path(srcrootdir); + final String dstrootdir = "/dst_root"; + final Path dstrootpath = new Path(dstrootdir); + + {//test -filelimit + MyFile[] files = createFiles(URI.create(nnUri), srcrootdir); + int filelimit = files.length / 2; + System.out.println("filelimit=" + filelimit); + + ToolRunner.run(distcp, + new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir}); + String results = execCmd(shell, "-lsr", dstrootdir); + results = removePrefix(results, dstrootdir); + System.out.println("results=" + results); + + FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true); + assertEquals(filelimit, dststat.length); + deldir(fs, dstrootdir); + deldir(fs, srcrootdir); + } + + {//test -sizelimit + createFiles(URI.create(nnUri), srcrootdir); + long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2; + System.out.println("sizelimit=" + sizelimit); + + ToolRunner.run(distcp, + new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir}); + + ContentSummary summary = fs.getContentSummary(dstrootpath); + System.out.println("summary=" + summary); + assertTrue(summary.getLength() <= sizelimit); + deldir(fs, dstrootdir); + deldir(fs, srcrootdir); + } + + {//test update + final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir); + final long totalsize = fs.getContentSummary(srcrootpath).getLength(); + System.out.println("src.length=" + srcs.length); + System.out.println("totalsize =" + totalsize); + fs.mkdirs(dstrootpath); + final int parts = RAN.nextInt(NFILES/3 - 1) + 2; + final int filelimit = srcs.length/parts; + final long sizelimit = totalsize/parts; + System.out.println("filelimit=" + filelimit); + System.out.println("sizelimit=" + sizelimit); + System.out.println("parts =" + parts); + final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit, + "-update", nnUri+srcrootdir, nnUri+dstrootdir}; + + int dstfilecount = 0; + long dstsize = 0; + for(int i = 0; i <= parts; i++) { + ToolRunner.run(distcp, args); + + FileStatus[] dststat = getFileStatus(fs, dstrootdir, srcs, true); + System.out.println(i + ") dststat.length=" + dststat.length); + assertTrue(dststat.length - dstfilecount <= filelimit); + ContentSummary summary = fs.getContentSummary(dstrootpath); + System.out.println(i + ") summary.getLength()=" + summary.getLength()); + assertTrue(summary.getLength() - dstsize <= sizelimit); + assertTrue(checkFiles(fs, dstrootdir, srcs, true)); + dstfilecount = dststat.length; + dstsize = summary.getLength(); + } + + deldir(fs, dstrootdir); + deldir(fs, srcrootdir); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + static final long now = System.currentTimeMillis(); + + static UnixUserGroupInformation createUGI(String name, boolean issuper) { + String username = name + now; + String group = issuper? "supergroup": username; + return UnixUserGroupInformation.createImmutable( + new String[]{username, group}); + } + + static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi + ) throws IOException { + final Path home = new Path("/user/" + ugi.getUserName()); + fs.mkdirs(home); + fs.setOwner(home, ugi.getUserName(), ugi.getGroupNames()[0]); + fs.setPermission(home, new FsPermission((short)0700)); + return home; + } + + public void testHftpAccessControl() throws Exception { + MiniDFSCluster cluster = null; + try { + final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); + final UnixUserGroupInformation USER_UGI = createUGI("user", false); + + //start cluster by DFS_UGI + final Configuration dfsConf = new Configuration(); + UnixUserGroupInformation.saveToConf(dfsConf, + UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI); + cluster = new MiniDFSCluster(dfsConf, 2, true, null); + cluster.waitActive(); + + final String httpAdd = dfsConf.get("dfs.http.address"); + final URI nnURI = FileSystem.getDefaultUri(dfsConf); + final String nnUri = nnURI.toString(); + final Path home = createHomeDirectory(FileSystem.get(nnURI, dfsConf), USER_UGI); + + //now, login as USER_UGI + final Configuration userConf = new Configuration(); + UnixUserGroupInformation.saveToConf(userConf, + UnixUserGroupInformation.UGI_PROPERTY_NAME, USER_UGI); + final FileSystem fs = FileSystem.get(nnURI, userConf); + + final Path srcrootpath = new Path(home, "src_root"); + final String srcrootdir = srcrootpath.toString(); + final Path dstrootpath = new Path(home, "dst_root"); + final String dstrootdir = dstrootpath.toString(); + final DistCp distcp = new DistCp(userConf); + + FileSystem.mkdirs(fs, srcrootpath, new FsPermission((short)0700)); + final String[] args = {"hftp://"+httpAdd+srcrootdir, nnUri+dstrootdir}; + + { //copy with permission 000, should fail + fs.setPermission(srcrootpath, new FsPermission((short)0)); + assertEquals(-3, ToolRunner.run(distcp, args)); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + /** test -delete */ + public void testDelete() throws Exception { + final Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(conf, 2, true, null); + final URI nnURI = FileSystem.getDefaultUri(conf); + final String nnUri = nnURI.toString(); + final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); + + final DistCp distcp = new DistCp(conf); + final FsShell shell = new FsShell(conf); + + final String srcrootdir = "/src_root"; + final String dstrootdir = "/dst_root"; + + { + //create source files + createFiles(nnURI, srcrootdir); + String srcresults = execCmd(shell, "-lsr", srcrootdir); + srcresults = removePrefix(srcresults, srcrootdir); + System.out.println("srcresults=" + srcresults); + + //create some files in dst + createFiles(nnURI, dstrootdir); + System.out.println("dstrootdir=" + dstrootdir); + shell.run(new String[]{"-lsr", dstrootdir}); + + //run distcp + ToolRunner.run(distcp, + new String[]{"-delete", "-update", "-log", "/log", + nnUri+srcrootdir, nnUri+dstrootdir}); + + //make sure src and dst contains the same files + String dstresults = execCmd(shell, "-lsr", dstrootdir); + dstresults = removePrefix(dstresults, dstrootdir); + System.out.println("first dstresults=" + dstresults); + assertEquals(srcresults, dstresults); + + //create additional file in dst + create(fs, new Path(dstrootdir, "foo")); + create(fs, new Path(dstrootdir, "foobar")); + + //run distcp again + ToolRunner.run(distcp, + new String[]{"-delete", "-update", "-log", "/log2", + nnUri+srcrootdir, nnUri+dstrootdir}); + + //make sure src and dst contains the same files + dstresults = execCmd(shell, "-lsr", dstrootdir); + dstresults = removePrefix(dstresults, dstrootdir); + System.out.println("second dstresults=" + dstresults); + assertEquals(srcresults, dstresults); + + //cleanup + deldir(fs, dstrootdir); + deldir(fs, srcrootdir); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + + static void create(FileSystem fs, Path f) throws IOException { + FSDataOutputStream out = fs.create(f); + try { + byte[] b = new byte[1024 + RAN.nextInt(1024)]; + RAN.nextBytes(b); + out.write(b); + } finally { + if (out != null) out.close(); + } + } + + static String execCmd(FsShell shell, String... args) throws Exception { + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(baout, true); + PrintStream old = System.out; + System.setOut(out); + shell.run(args); + out.close(); + System.setOut(old); + return baout.toString(); + } + + private static String removePrefix(String lines, String prefix) { + final int prefixlen = prefix.length(); + final StringTokenizer t = new StringTokenizer(lines, "\n"); + final StringBuffer results = new StringBuffer(); + for(; t.hasMoreTokens(); ) { + String s = t.nextToken(); + results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n"); + } + return results.toString(); + } +} \ No newline at end of file diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java new file mode 100644 index 0000000000..b21914eb01 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Date; +import java.util.StringTokenizer; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.StringUtils; + +/** + * Distributed i/o benchmark. + *

+ * This test writes into or reads from a specified number of files. + * File size is specified as a parameter to the test. + * Each file is accessed in a separate map task. + *

+ * The reducer collects the following statistics: + *

    + *
  • number of tasks completed
  • + *
  • number of bytes written/read
  • + *
  • execution time
  • + *
  • io rate
  • + *
  • io rate squared
  • + *
+ * + * Finally, the following information is appended to a local file + *
    + *
  • read or write test
  • + *
  • date and time the test finished
  • + *
  • number of files
  • + *
  • total number of bytes processed
  • + *
  • throughput in mb/sec (total number of bytes / sum of processing times)
  • + *
  • average i/o rate in mb/sec per file
  • + *
  • standard deviation of i/o rate
  • + *
+ */ +public class TestDFSIO extends TestCase { + // Constants + private static final Log LOG = LogFactory.getLog(TestDFSIO.class); + private static final int TEST_TYPE_READ = 0; + private static final int TEST_TYPE_WRITE = 1; + private static final int TEST_TYPE_CLEANUP = 2; + private static final int DEFAULT_BUFFER_SIZE = 1000000; + private static final String BASE_FILE_NAME = "test_io_"; + private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; + + private static Configuration fsConfig = new Configuration(); + private static final long MEGA = 0x100000; + private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO"); + private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); + private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); + private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); + private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); + + /** + * Run the test with default parameters. + * + * @throws Exception + */ + public void testIOs() throws Exception { + testIOs(10, 10); + } + + /** + * Run the test with the specified parameters. + * + * @param fileSize file size + * @param nrFiles number of files + * @throws IOException + */ + public static void testIOs(int fileSize, int nrFiles) + throws IOException { + + FileSystem fs = FileSystem.get(fsConfig); + + createControlFile(fs, fileSize, nrFiles); + writeTest(fs); + readTest(fs); + cleanup(fs); + } + + private static void createControlFile( + FileSystem fs, + int fileSize, // in MB + int nrFiles + ) throws IOException { + LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); + + fs.delete(CONTROL_DIR, true); + + for(int i=0; i < nrFiles; i++) { + String name = getFileName(i); + Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(fs, fsConfig, controlFile, + Text.class, LongWritable.class, + CompressionType.NONE); + writer.append(new Text(name), new LongWritable(fileSize)); + } catch(Exception e) { + throw new IOException(e.getLocalizedMessage()); + } finally { + if (writer != null) + writer.close(); + writer = null; + } + } + LOG.info("created control files for: "+nrFiles+" files"); + } + + private static String getFileName(int fIdx) { + return BASE_FILE_NAME + Integer.toString(fIdx); + } + + /** + * Write/Read mapper base class. + *

+ * Collects the following statistics per task: + *

    + *
  • number of tasks completed
  • + *
  • number of bytes written/read
  • + *
  • execution time
  • + *
  • i/o rate
  • + *
  • i/o rate squared
  • + *
+ */ + private abstract static class IOStatMapper extends IOMapperBase { + IOStatMapper() { + super(fsConfig); + } + + void collectStats(OutputCollector output, + String name, + long execTime, + Object objSize) throws IOException { + long totalSize = ((Long)objSize).longValue(); + float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); + LOG.info("Number of bytes processed = " + totalSize); + LOG.info("Exec time = " + execTime); + LOG.info("IO rate = " + ioRateMbSec); + + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), + new Text(String.valueOf(1))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), + new Text(String.valueOf(totalSize))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), + new Text(String.valueOf(execTime))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), + new Text(String.valueOf(ioRateMbSec*1000))); + output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), + new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); + } + } + + /** + * Write mapper class. + */ + public static class WriteMapper extends IOStatMapper { + + public WriteMapper() { + super(); + for(int i=0; i < bufferSize; i++) + buffer[i] = (byte)('0' + i % 50); + } + + public Object doIO(Reporter reporter, + String name, + long totalSize + ) throws IOException { + // create file + totalSize *= MEGA; + OutputStream out; + out = fs.create(new Path(DATA_DIR, name), true, bufferSize); + + try { + // write to the file + long nrRemaining; + for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { + int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; + out.write(buffer, 0, curSize); + reporter.setStatus("writing " + name + "@" + + (totalSize - nrRemaining) + "/" + totalSize + + " ::host = " + hostName); + } + } finally { + out.close(); + } + return new Long(totalSize); + } + } + + private static void writeTest(FileSystem fs) + throws IOException { + + fs.delete(DATA_DIR, true); + fs.delete(WRITE_DIR, true); + + runIOTest(WriteMapper.class, WRITE_DIR); + } + + private static void runIOTest( Class mapperClass, + Path outputDir + ) throws IOException { + JobConf job = new JobConf(fsConfig, TestDFSIO.class); + + FileInputFormat.setInputPaths(job, CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(mapperClass); + job.setReducerClass(AccumulatingReducer.class); + + FileOutputFormat.setOutputPath(job, outputDir); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + /** + * Read mapper class. + */ + public static class ReadMapper extends IOStatMapper { + + public ReadMapper() { + super(); + } + + public Object doIO(Reporter reporter, + String name, + long totalSize + ) throws IOException { + totalSize *= MEGA; + // open file + DataInputStream in = fs.open(new Path(DATA_DIR, name)); + try { + long actualSize = 0; + for(int curSize = bufferSize; curSize == bufferSize;) { + curSize = in.read(buffer, 0, bufferSize); + actualSize += curSize; + reporter.setStatus("reading " + name + "@" + + actualSize + "/" + totalSize + + " ::host = " + hostName); + } + } finally { + in.close(); + } + return new Long(totalSize); + } + } + + private static void readTest(FileSystem fs) throws IOException { + fs.delete(READ_DIR, true); + runIOTest(ReadMapper.class, READ_DIR); + } + + private static void sequentialTest( + FileSystem fs, + int testType, + int fileSize, + int nrFiles + ) throws Exception { + IOStatMapper ioer = null; + if (testType == TEST_TYPE_READ) + ioer = new ReadMapper(); + else if (testType == TEST_TYPE_WRITE) + ioer = new WriteMapper(); + else + return; + for(int i=0; i < nrFiles; i++) + ioer.doIO(Reporter.NULL, + BASE_FILE_NAME+Integer.toString(i), + MEGA*fileSize); + } + + public static void main(String[] args) { + int testType = TEST_TYPE_READ; + int bufferSize = DEFAULT_BUFFER_SIZE; + int fileSize = 1; + int nrFiles = 1; + String resFileName = DEFAULT_RES_FILE_NAME; + boolean isSequential = false; + + String className = TestDFSIO.class.getSimpleName(); + String version = className + ".0.0.4"; + String usage = "Usage: " + className + " -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; + + System.out.println(version); + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].startsWith("-read")) { + testType = TEST_TYPE_READ; + } else if (args[i].equals("-write")) { + testType = TEST_TYPE_WRITE; + } else if (args[i].equals("-clean")) { + testType = TEST_TYPE_CLEANUP; + } else if (args[i].startsWith("-seq")) { + isSequential = true; + } else if (args[i].equals("-nrFiles")) { + nrFiles = Integer.parseInt(args[++i]); + } else if (args[i].equals("-fileSize")) { + fileSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bufferSize")) { + bufferSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-resFile")) { + resFileName = args[++i]; + } + } + + LOG.info("nrFiles = " + nrFiles); + LOG.info("fileSize (MB) = " + fileSize); + LOG.info("bufferSize = " + bufferSize); + + try { + fsConfig.setInt("test.io.file.buffer.size", bufferSize); + FileSystem fs = FileSystem.get(fsConfig); + + if (isSequential) { + long tStart = System.currentTimeMillis(); + sequentialTest(fs, testType, fileSize, nrFiles); + long execTime = System.currentTimeMillis() - tStart; + String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; + LOG.info(resultLine); + return; + } + if (testType == TEST_TYPE_CLEANUP) { + cleanup(fs); + return; + } + createControlFile(fs, fileSize, nrFiles); + long tStart = System.currentTimeMillis(); + if (testType == TEST_TYPE_WRITE) + writeTest(fs); + if (testType == TEST_TYPE_READ) + readTest(fs); + long execTime = System.currentTimeMillis() - tStart; + + analyzeResult(fs, testType, execTime, resFileName); + } catch(Exception e) { + System.err.print(StringUtils.stringifyException(e)); + System.exit(-1); + } + } + + private static void analyzeResult( FileSystem fs, + int testType, + long execTime, + String resFileName + ) throws IOException { + Path reduceFile; + if (testType == TEST_TYPE_WRITE) + reduceFile = new Path(WRITE_DIR, "part-00000"); + else + reduceFile = new Path(READ_DIR, "part-00000"); + DataInputStream in; + in = new DataInputStream(fs.open(reduceFile)); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + long tasks = 0; + long size = 0; + long time = 0; + float rate = 0; + float sqrate = 0; + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if (attr.endsWith(":tasks")) + tasks = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":size")) + size = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":time")) + time = Long.parseLong(tokens.nextToken()); + else if (attr.endsWith(":rate")) + rate = Float.parseFloat(tokens.nextToken()); + else if (attr.endsWith(":sqrate")) + sqrate = Float.parseFloat(tokens.nextToken()); + } + + double med = rate / 1000 / tasks; + double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); + String resultLines[] = { + "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : + (testType == TEST_TYPE_READ) ? "read" : + "unknown"), + " Date & time: " + new Date(System.currentTimeMillis()), + " Number of files: " + tasks, + "Total MBytes processed: " + size/MEGA, + " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), + "Average IO rate mb/sec: " + med, + " IO rate std deviation: " + stdDev, + " Test exec time sec: " + (float)execTime / 1000, + "" }; + + PrintStream res = new PrintStream( + new FileOutputStream( + new File(resFileName), true)); + for(int i = 0; i < resultLines.length; i++) { + LOG.info(resultLines[i]); + res.println(resultLines[i]); + } + } + + private static void cleanup(FileSystem fs) throws IOException { + LOG.info("Cleaning up test files"); + fs.delete(new Path(TEST_ROOT_DIR), true); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java new file mode 100644 index 0000000000..c83993d0ca --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java @@ -0,0 +1,629 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Random; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Map; +import java.util.HashMap; +import java.net.InetSocketAddress; +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.fs.shell.CommandFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.LongSumReducer; +import org.apache.hadoop.security.UnixUserGroupInformation; + +public class TestFileSystem extends TestCase { + private static final Log LOG = FileSystem.LOG; + + private static Configuration conf = new Configuration(); + private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096); + + private static final long MEGA = 1024 * 1024; + private static final int SEEKS_PER_FILE = 4; + + private static String ROOT = System.getProperty("test.build.data","fs_test"); + private static Path CONTROL_DIR = new Path(ROOT, "fs_control"); + private static Path WRITE_DIR = new Path(ROOT, "fs_write"); + private static Path READ_DIR = new Path(ROOT, "fs_read"); + private static Path DATA_DIR = new Path(ROOT, "fs_data"); + + public void testFs() throws Exception { + testFs(10 * MEGA, 100, 0); + } + + public static void testFs(long megaBytes, int numFiles, long seed) + throws Exception { + + FileSystem fs = FileSystem.get(conf); + + if (seed == 0) + seed = new Random().nextLong(); + + LOG.info("seed = "+seed); + + createControlFile(fs, megaBytes, numFiles, seed); + writeTest(fs, false); + readTest(fs, false); + seekTest(fs, false); + fs.delete(CONTROL_DIR, true); + fs.delete(DATA_DIR, true); + fs.delete(WRITE_DIR, true); + fs.delete(READ_DIR, true); + } + + public static void testCommandFormat() throws Exception { + // This should go to TestFsShell.java when it is added. + CommandFormat cf; + cf= new CommandFormat("copyToLocal", 2,2,"crc","ignoreCrc"); + assertEquals(cf.parse(new String[] {"-get","file", "-"}, 1).get(1), "-"); + assertEquals(cf.parse(new String[] {"-get","file","-ignoreCrc","/foo"}, 1).get(1),"/foo"); + cf = new CommandFormat("tail", 1, 1, "f"); + assertEquals(cf.parse(new String[] {"-tail","fileName"}, 1).get(0),"fileName"); + assertEquals(cf.parse(new String[] {"-tail","-f","fileName"}, 1).get(0),"fileName"); + cf = new CommandFormat("setrep", 2, 2, "R", "w"); + assertEquals(cf.parse(new String[] {"-setrep","-R","2","/foo/bar"}, 1).get(1), "/foo/bar"); + cf = new CommandFormat("put", 2, 10000); + assertEquals(cf.parse(new String[] {"-put", "-", "dest"}, 1).get(1), "dest"); + } + + public static void createControlFile(FileSystem fs, + long megaBytes, int numFiles, + long seed) throws Exception { + + LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files"); + + Path controlFile = new Path(CONTROL_DIR, "files"); + fs.delete(controlFile, true); + Random random = new Random(seed); + + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, controlFile, + Text.class, LongWritable.class, CompressionType.NONE); + + long totalSize = 0; + long maxSize = ((megaBytes / numFiles) * 2) + 1; + try { + while (totalSize < megaBytes) { + Text name = new Text(Long.toString(random.nextLong())); + + long size = random.nextLong(); + if (size < 0) + size = -size; + size = size % maxSize; + + //LOG.info(" adding: name="+name+" size="+size); + + writer.append(name, new LongWritable(size)); + + totalSize += size; + } + } finally { + writer.close(); + } + LOG.info("created control file for: "+totalSize+" bytes"); + } + + public static class WriteMapper extends Configured + implements Mapper { + + private Random random = new Random(); + private byte[] buffer = new byte[BUFFER_SIZE]; + private FileSystem fs; + private boolean fastCheck; + + // a random suffix per task + private String suffix = "-"+random.nextLong(); + + { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public WriteMapper() { super(null); } + + public WriteMapper(Configuration conf) { super(conf); } + + public void configure(JobConf job) { + setConf(job); + fastCheck = job.getBoolean("fs.test.fastCheck", false); + } + + public void map(Text key, LongWritable value, + OutputCollector collector, + Reporter reporter) + throws IOException { + + String name = key.toString(); + long size = value.get(); + long seed = Long.parseLong(name); + + random.setSeed(seed); + reporter.setStatus("creating " + name); + + // write to temp file initially to permit parallel execution + Path tempFile = new Path(DATA_DIR, name+suffix); + OutputStream out = fs.create(tempFile); + + long written = 0; + try { + while (written < size) { + if (fastCheck) { + Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE)); + } else { + random.nextBytes(buffer); + } + long remains = size - written; + int length = (remains<=buffer.length) ? (int)remains : buffer.length; + out.write(buffer, 0, length); + written += length; + reporter.setStatus("writing "+name+"@"+written+"/"+size); + } + } finally { + out.close(); + } + // rename to final location + fs.rename(tempFile, new Path(DATA_DIR, name)); + + collector.collect(new Text("bytes"), new LongWritable(written)); + + reporter.setStatus("wrote " + name); + } + + public void close() { + } + + } + + public static void writeTest(FileSystem fs, boolean fastCheck) + throws Exception { + + fs.delete(DATA_DIR, true); + fs.delete(WRITE_DIR, true); + + JobConf job = new JobConf(conf, TestFileSystem.class); + job.setBoolean("fs.test.fastCheck", fastCheck); + + FileInputFormat.setInputPaths(job, CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(WriteMapper.class); + job.setReducerClass(LongSumReducer.class); + + FileOutputFormat.setOutputPath(job, WRITE_DIR); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + public static class ReadMapper extends Configured + implements Mapper { + + private Random random = new Random(); + private byte[] buffer = new byte[BUFFER_SIZE]; + private byte[] check = new byte[BUFFER_SIZE]; + private FileSystem fs; + private boolean fastCheck; + + { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public ReadMapper() { super(null); } + + public ReadMapper(Configuration conf) { super(conf); } + + public void configure(JobConf job) { + setConf(job); + fastCheck = job.getBoolean("fs.test.fastCheck", false); + } + + public void map(Text key, LongWritable value, + OutputCollector collector, + Reporter reporter) + throws IOException { + + String name = key.toString(); + long size = value.get(); + long seed = Long.parseLong(name); + + random.setSeed(seed); + reporter.setStatus("opening " + name); + + DataInputStream in = + new DataInputStream(fs.open(new Path(DATA_DIR, name))); + + long read = 0; + try { + while (read < size) { + long remains = size - read; + int n = (remains<=buffer.length) ? (int)remains : buffer.length; + in.readFully(buffer, 0, n); + read += n; + if (fastCheck) { + Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE)); + } else { + random.nextBytes(check); + } + if (n != buffer.length) { + Arrays.fill(buffer, n, buffer.length, (byte)0); + Arrays.fill(check, n, check.length, (byte)0); + } + assertTrue(Arrays.equals(buffer, check)); + + reporter.setStatus("reading "+name+"@"+read+"/"+size); + + } + } finally { + in.close(); + } + + collector.collect(new Text("bytes"), new LongWritable(read)); + + reporter.setStatus("read " + name); + } + + public void close() { + } + + } + + public static void readTest(FileSystem fs, boolean fastCheck) + throws Exception { + + fs.delete(READ_DIR, true); + + JobConf job = new JobConf(conf, TestFileSystem.class); + job.setBoolean("fs.test.fastCheck", fastCheck); + + + FileInputFormat.setInputPaths(job, CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(ReadMapper.class); + job.setReducerClass(LongSumReducer.class); + + FileOutputFormat.setOutputPath(job, READ_DIR); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + + public static class SeekMapper extends Configured + implements Mapper { + + private Random random = new Random(); + private byte[] check = new byte[BUFFER_SIZE]; + private FileSystem fs; + private boolean fastCheck; + + { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public SeekMapper() { super(null); } + + public SeekMapper(Configuration conf) { super(conf); } + + public void configure(JobConf job) { + setConf(job); + fastCheck = job.getBoolean("fs.test.fastCheck", false); + } + + public void map(Text key, LongWritable value, + OutputCollector collector, + Reporter reporter) + throws IOException { + String name = key.toString(); + long size = value.get(); + long seed = Long.parseLong(name); + + if (size == 0) return; + + reporter.setStatus("opening " + name); + + FSDataInputStream in = fs.open(new Path(DATA_DIR, name)); + + try { + for (int i = 0; i < SEEKS_PER_FILE; i++) { + // generate a random position + long position = Math.abs(random.nextLong()) % size; + + // seek file to that position + reporter.setStatus("seeking " + name); + in.seek(position); + byte b = in.readByte(); + + // check that byte matches + byte checkByte = 0; + // advance random state to that position + random.setSeed(seed); + for (int p = 0; p <= position; p+= check.length) { + reporter.setStatus("generating data for " + name); + if (fastCheck) { + checkByte = (byte)random.nextInt(Byte.MAX_VALUE); + } else { + random.nextBytes(check); + checkByte = check[(int)(position % check.length)]; + } + } + assertEquals(b, checkByte); + } + } finally { + in.close(); + } + } + + public void close() { + } + + } + + public static void seekTest(FileSystem fs, boolean fastCheck) + throws Exception { + + fs.delete(READ_DIR, true); + + JobConf job = new JobConf(conf, TestFileSystem.class); + job.setBoolean("fs.test.fastCheck", fastCheck); + + FileInputFormat.setInputPaths(job,CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(SeekMapper.class); + job.setReducerClass(LongSumReducer.class); + + FileOutputFormat.setOutputPath(job, READ_DIR); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + + public static void main(String[] args) throws Exception { + int megaBytes = 10; + int files = 100; + boolean noRead = false; + boolean noWrite = false; + boolean noSeek = false; + boolean fastCheck = false; + long seed = new Random().nextLong(); + + String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]"; + + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-files")) { + files = Integer.parseInt(args[++i]); + } else if (args[i].equals("-megaBytes")) { + megaBytes = Integer.parseInt(args[++i]); + } else if (args[i].equals("-noread")) { + noRead = true; + } else if (args[i].equals("-nowrite")) { + noWrite = true; + } else if (args[i].equals("-noseek")) { + noSeek = true; + } else if (args[i].equals("-fastcheck")) { + fastCheck = true; + } + } + + LOG.info("seed = "+seed); + LOG.info("files = " + files); + LOG.info("megaBytes = " + megaBytes); + + FileSystem fs = FileSystem.get(conf); + + if (!noWrite) { + createControlFile(fs, megaBytes*MEGA, files, seed); + writeTest(fs, fastCheck); + } + if (!noRead) { + readTest(fs, fastCheck); + } + if (!noSeek) { + seekTest(fs, fastCheck); + } + } + + static Configuration createConf4Testing(String username) throws Exception { + Configuration conf = new Configuration(); + UnixUserGroupInformation.saveToConf(conf, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"group"})); + return conf; + } + + public void testFsCache() throws Exception { + { + long now = System.currentTimeMillis(); + Configuration[] conf = {new Configuration(), + createConf4Testing("foo" + now), createConf4Testing("bar" + now)}; + FileSystem[] fs = new FileSystem[conf.length]; + + for(int i = 0; i < conf.length; i++) { + fs[i] = FileSystem.get(conf[i]); + assertEquals(fs[i], FileSystem.get(conf[i])); + for(int j = 0; j < i; j++) { + assertFalse(fs[j] == fs[i]); + } + } + FileSystem.closeAll(); + } + + { + try { + runTestCache(NameNode.DEFAULT_PORT); + } catch(java.net.BindException be) { + LOG.warn("Cannot test NameNode.DEFAULT_PORT (=" + + NameNode.DEFAULT_PORT + ")", be); + } + + runTestCache(0); + } + } + + static void runTestCache(int port) throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster(port, conf, 2, true, true, null, null); + URI uri = cluster.getFileSystem().getUri(); + LOG.info("uri=" + uri); + + { + FileSystem fs = FileSystem.get(uri, new Configuration()); + checkPath(cluster, fs); + for(int i = 0; i < 100; i++) { + assertTrue(fs == FileSystem.get(uri, new Configuration())); + } + } + + if (port == NameNode.DEFAULT_PORT) { + //test explicit default port + URI uri2 = new URI(uri.getScheme(), uri.getUserInfo(), + uri.getHost(), NameNode.DEFAULT_PORT, uri.getPath(), + uri.getQuery(), uri.getFragment()); + LOG.info("uri2=" + uri2); + FileSystem fs = FileSystem.get(uri2, conf); + checkPath(cluster, fs); + for(int i = 0; i < 100; i++) { + assertTrue(fs == FileSystem.get(uri2, new Configuration())); + } + } + } finally { + if (cluster != null) cluster.shutdown(); + } + } + + static void checkPath(MiniDFSCluster cluster, FileSystem fileSys) throws IOException { + InetSocketAddress add = cluster.getNameNode().getNameNodeAddress(); + // Test upper/lower case + fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort())); + } + + public void testFsClose() throws Exception { + { + Configuration conf = new Configuration(); + new Path("file:///").getFileSystem(conf); + UnixUserGroupInformation.login(conf, true); + FileSystem.closeAll(); + } + + { + Configuration conf = new Configuration(); + new Path("hftp://localhost:12345/").getFileSystem(conf); + UnixUserGroupInformation.login(conf, true); + FileSystem.closeAll(); + } + + { + Configuration conf = new Configuration(); + FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf); + UnixUserGroupInformation.login(fs.getConf(), true); + FileSystem.closeAll(); + } + } + + + public void testCacheKeysAreCaseInsensitive() + throws Exception + { + Configuration conf = new Configuration(); + + // check basic equality + FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf); + FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf); + assertEquals( lowercaseCachekey1, lowercaseCachekey2 ); + + // check insensitive equality + FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HFTP://Localhost:12345/"), conf); + assertEquals( lowercaseCachekey2, uppercaseCachekey ); + + // check behaviour with collections + List list = new ArrayList(); + list.add(uppercaseCachekey); + assertTrue(list.contains(uppercaseCachekey)); + assertTrue(list.contains(lowercaseCachekey2)); + + Set set = new HashSet(); + set.add(uppercaseCachekey); + assertTrue(set.contains(uppercaseCachekey)); + assertTrue(set.contains(lowercaseCachekey2)); + + Map map = new HashMap(); + map.put(uppercaseCachekey, ""); + assertTrue(map.containsKey(uppercaseCachekey)); + assertTrue(map.containsKey(lowercaseCachekey2)); + + } + + public static void testFsUniqueness(long megaBytes, int numFiles, long seed) + throws Exception { + + // multiple invocations of FileSystem.get return the same object. + FileSystem fs1 = FileSystem.get(conf); + FileSystem fs2 = FileSystem.get(conf); + assertTrue(fs1 == fs2); + + // multiple invocations of FileSystem.newInstance return different objects + fs1 = FileSystem.newInstance(conf); + fs2 = FileSystem.newInstance(conf); + assertTrue(fs1 != fs2 && !fs1.equals(fs2)); + fs1.close(); + fs2.close(); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java new file mode 100644 index 0000000000..f9a10c88a3 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.tools.HadoopArchives; +import org.apache.hadoop.util.ToolRunner; + +/** + * test the har file system + * create a har filesystem + * run fs commands + * and then run a map reduce job + */ +public class TestHarFileSystem extends TestCase { + private Path inputPath; + private MiniDFSCluster dfscluster; + private MiniMRCluster mapred; + private FileSystem fs; + private Path filea, fileb, filec; + private Path archivePath; + + protected void setUp() throws Exception { + super.setUp(); + dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null); + fs = dfscluster.getFileSystem(); + mapred = new MiniMRCluster(2, fs.getUri().toString(), 1); + inputPath = new Path(fs.getHomeDirectory(), "test"); + filea = new Path(inputPath,"a"); + fileb = new Path(inputPath,"b"); + filec = new Path(inputPath,"c"); + archivePath = new Path(fs.getHomeDirectory(), "tmp"); + } + + protected void tearDown() throws Exception { + try { + if (mapred != null) { + mapred.shutdown(); + } + if (dfscluster != null) { + dfscluster.shutdown(); + } + } catch(Exception e) { + System.err.println(e); + } + super.tearDown(); + } + + static class TextMapperReducer implements Mapper, + Reducer { + + public void configure(JobConf conf) { + //do nothing + } + + public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { + output.collect(value, new Text("")); + } + + public void close() throws IOException { + // do nothing + } + + public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + while(values.hasNext()) { + values.next(); + output.collect(key, null); + } + } + } + + public void testArchives() throws Exception { + fs.mkdirs(inputPath); + + FSDataOutputStream out = fs.create(filea); + out.write("a".getBytes()); + out.close(); + out = fs.create(fileb); + out.write("b".getBytes()); + out.close(); + out = fs.create(filec); + out.write("c".getBytes()); + out.close(); + Configuration conf = mapred.createJobConf(); + HadoopArchives har = new HadoopArchives(conf); + String[] args = new String[3]; + //check for destination not specfied + args[0] = "-archiveName"; + args[1] = "foo.har"; + args[2] = inputPath.toString(); + int ret = ToolRunner.run(har, args); + assertTrue(ret != 0); + args = new String[4]; + //check for wrong archiveName + args[0] = "-archiveName"; + args[1] = "/d/foo.har"; + args[2] = inputPath.toString(); + args[3] = archivePath.toString(); + ret = ToolRunner.run(har, args); + assertTrue(ret != 0); +// se if dest is a file + args[1] = "foo.har"; + args[3] = filec.toString(); + ret = ToolRunner.run(har, args); + assertTrue(ret != 0); + //this is a valid run + args[0] = "-archiveName"; + args[1] = "foo.har"; + args[2] = inputPath.toString(); + args[3] = archivePath.toString(); + ret = ToolRunner.run(har, args); + //checl for the existenece of the archive + assertTrue(ret == 0); + ///try running it again. it should not + // override the directory + ret = ToolRunner.run(har, args); + assertTrue(ret != 0); + Path finalPath = new Path(archivePath, "foo.har"); + Path fsPath = new Path(inputPath.toUri().getPath()); + String relative = fsPath.toString().substring(1); + Path filePath = new Path(finalPath, relative); + //make it a har path + Path harPath = new Path("har://" + filePath.toUri().getPath()); + assertTrue(fs.exists(new Path(finalPath, "_index"))); + assertTrue(fs.exists(new Path(finalPath, "_masterindex"))); + assertTrue(!fs.exists(new Path(finalPath, "_logs"))); + //creation tested + //check if the archive is same + // do ls and cat on all the files + FsShell shell = new FsShell(conf); + args = new String[2]; + args[0] = "-ls"; + args[1] = harPath.toString(); + ret = ToolRunner.run(shell, args); + // ls should work. + assertTrue((ret == 0)); + //now check for contents of filea + // fileb and filec + Path harFilea = new Path(harPath, "a"); + Path harFileb = new Path(harPath, "b"); + Path harFilec = new Path(harPath, "c"); + FileSystem harFs = harFilea.getFileSystem(conf); + FSDataInputStream fin = harFs.open(harFilea); + byte[] b = new byte[4]; + int readBytes = fin.read(b); + assertTrue("Empty read.", readBytes > 0); + fin.close(); + assertTrue("strings are equal ", (b[0] == "a".getBytes()[0])); + fin = harFs.open(harFileb); + readBytes = fin.read(b); + assertTrue("Empty read.", readBytes > 0); + fin.close(); + assertTrue("strings are equal ", (b[0] == "b".getBytes()[0])); + fin = harFs.open(harFilec); + readBytes = fin.read(b); + assertTrue("Empty read.", readBytes > 0); + fin.close(); + assertTrue("strings are equal ", (b[0] == "c".getBytes()[0])); + // ok all files match + // run a map reduce job + Path outdir = new Path(fs.getHomeDirectory(), "mapout"); + JobConf jobconf = mapred.createJobConf(); + FileInputFormat.addInputPath(jobconf, harPath); + jobconf.setInputFormat(TextInputFormat.class); + jobconf.setOutputFormat(TextOutputFormat.class); + FileOutputFormat.setOutputPath(jobconf, outdir); + jobconf.setMapperClass(TextMapperReducer.class); + jobconf.setMapOutputKeyClass(Text.class); + jobconf.setMapOutputValueClass(Text.class); + jobconf.setReducerClass(TextMapperReducer.class); + jobconf.setNumReduceTasks(1); + JobClient.runJob(jobconf); + args[1] = outdir.toString(); + ret = ToolRunner.run(shell, args); + + FileStatus[] status = fs.globStatus(new Path(outdir, "part*")); + Path reduceFile = status[0].getPath(); + FSDataInputStream reduceIn = fs.open(reduceFile); + b = new byte[6]; + readBytes = reduceIn.read(b); + assertTrue("Should read 6 bytes.", readBytes == 6); + //assuming all the 6 bytes were read. + Text readTxt = new Text(b); + assertTrue("a\nb\nc\n".equals(readTxt.toString())); + assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1); + reduceIn.close(); + } +} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java b/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java new file mode 100644 index 0000000000..22b8679fb7 --- /dev/null +++ b/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java @@ -0,0 +1,964 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Date; +import java.io.DataInputStream; +import java.io.FileOutputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.File; +import java.io.BufferedReader; +import java.util.StringTokenizer; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Iterator; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile; + +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; + +/** + * This program executes a specified operation that applies load to + * the NameNode. + * + * When run simultaneously on multiple nodes, this program functions + * as a stress-test and benchmark for namenode, especially when + * the number of bytes written to each file is small. + * + * Valid operations are: + * create_write + * open_read + * rename + * delete + * + * NOTE: The open_read, rename and delete operations assume that the files + * they operate on are already available. The create_write operation + * must be run before running the other operations. + */ + +public class NNBench { + private static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.hdfs.NNBench"); + + protected static String CONTROL_DIR_NAME = "control"; + protected static String OUTPUT_DIR_NAME = "output"; + protected static String DATA_DIR_NAME = "data"; + protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; + protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; + + public static String operation = "none"; + public static long numberOfMaps = 1l; // default is 1 + public static long numberOfReduces = 1l; // default is 1 + public static long startTime = + System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min + public static long blockSize = 1l; // default is 1 + public static int bytesToWrite = 0; // default is 0 + public static long bytesPerChecksum = 1l; // default is 1 + public static long numberOfFiles = 1l; // default is 1 + public static short replicationFactorPerFile = 1; // default is 1 + public static String baseDir = "/benchmarks/NNBench"; // default + public static boolean readFileAfterOpen = false; // default is to not read + + // Supported operations + private static final String OP_CREATE_WRITE = "create_write"; + private static final String OP_OPEN_READ = "open_read"; + private static final String OP_RENAME = "rename"; + private static final String OP_DELETE = "delete"; + + // To display in the format that matches the NN and DN log format + // Example: 2007-10-26 00:01:19,853 + static SimpleDateFormat sdf = + new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S"); + + private static Configuration config = new Configuration(); + + /** + * Clean up the files before a test run + * + * @throws IOException on error + */ + private static void cleanupBeforeTestrun() throws IOException { + FileSystem tempFS = FileSystem.get(config); + + // Delete the data directory only if it is the create/write operation + if (operation.equals(OP_CREATE_WRITE)) { + LOG.info("Deleting data directory"); + tempFS.delete(new Path(baseDir, DATA_DIR_NAME), true); + } + tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME), true); + tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME), true); + } + + /** + * Create control files before a test run. + * Number of files created is equal to the number of maps specified + * + * @throws IOException on error + */ + private static void createControlFiles() throws IOException { + FileSystem tempFS = FileSystem.get(config); + LOG.info("Creating " + numberOfMaps + " control files"); + + for (int i = 0; i < numberOfMaps; i++) { + String strFileName = "NNBench_Controlfile_" + i; + Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), + strFileName); + + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, + LongWritable.class, CompressionType.NONE); + writer.append(new Text(strFileName), new LongWritable(0l)); + } catch(Exception e) { + throw new IOException(e.getLocalizedMessage()); + } finally { + if (writer != null) { + writer.close(); + } + writer = null; + } + } + } + /** + * Display version + */ + private static void displayVersion() { + System.out.println(NNBENCH_VERSION); + } + + /** + * Display usage + */ + private static void displayUsage() { + String usage = + "Usage: nnbench \n" + + "Options:\n" + + "\t-operation \n" + + "\t * NOTE: The open_read, rename and delete operations assume " + + "that the files they operate on, are already available. " + + "The create_write operation must be run before running the " + + "other operations.\n" + + "\t-maps \n" + + "\t-reduces \n" + + "\t-startTime