diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1e1c6c9826..6572d6c733 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -12,6 +12,8 @@ Trunk (unreleased changes) (Plamen Jeliazkov via shv) IMPROVEMENTS + MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk) + MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the job-history related information. diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java index 6d8c166a67..54f1730cf2 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java @@ -101,13 +101,15 @@ public void addJobStats(Job job, JobStory jobdesc) { } int maps = 0; + int reds = 0; if (jobdesc == null) { throw new IllegalArgumentException( " JobStory not available for job " + job.getJobName()); } else { maps = jobdesc.getNumberMaps(); + reds = jobdesc.getNumberReduces(); } - JobStats stats = new JobStats(maps,job); + JobStats stats = new JobStats(maps, reds, job); jobMaps.put(seq,stats); } @@ -258,15 +260,20 @@ public void abort() { */ static class JobStats { private int noOfMaps; + private int noOfReds; private Job job; - public JobStats(int noOfMaps,Job job){ + public JobStats(int noOfMaps,int numOfReds, Job job){ this.job = job; this.noOfMaps = noOfMaps; + this.noOfReds = numOfReds; } public int getNoOfMaps() { return noOfMaps; } + public int getNoOfReds() { + return noOfReds; + } /** * Returns the job , diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java index 1060321dca..d78d631333 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java @@ -31,13 +31,12 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicBoolean; public class StressJobFactory extends JobFactory { public static final Log LOG = LogFactory.getLog(StressJobFactory.class); private final LoadStatus loadStatus = new LoadStatus(); - private final Condition condUnderloaded = this.lock.newCondition(); /** * The minimum ratio between pending+running map tasks (aka. incomplete map * tasks) and cluster map slot capacity for us to consider the cluster is @@ -150,23 +149,32 @@ public void run() { } LOG.info("START STRESS @ " + System.currentTimeMillis()); while (!Thread.currentThread().isInterrupted()) { - lock.lock(); try { while (loadStatus.overloaded()) { - //Wait while JT is overloaded. + if (LOG.isDebugEnabled()) { + LOG.debug("Cluster overloaded in run! Sleeping..."); + } + // sleep try { - condUnderloaded.await(); + Thread.sleep(1000); } catch (InterruptedException ie) { return; } } while (!loadStatus.overloaded()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cluster underloaded in run! Stressing..."); + } try { + //TODO This in-line read can block submission for large jobs. final JobStory job = getNextJobFiltered(); if (null == job) { return; } + if (LOG.isDebugEnabled()) { + LOG.debug("Job Selected: " + job.getJobID()); + } submitter.add( jobCreator.createGridmixJob( conf, 0L, job, scratch, @@ -175,14 +183,20 @@ public void run() { sequence.getAndIncrement())); // TODO: We need to take care of scenario when one map/reduce // takes more than 1 slot. - loadStatus.mapSlotsBackfill -= - calcEffectiveIncompleteMapTasks( - loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f); - loadStatus.reduceSlotsBackfill -= - calcEffectiveIncompleteReduceTasks( - loadStatus.reduceSlotCapacity, job.getNumberReduces(), - 0.0f); - --loadStatus.numJobsBackfill; + + // Lock the loadjob as we are making updates + int incompleteMapTasks = (int) calcEffectiveIncompleteMapTasks( + loadStatus.getMapCapacity(), + job.getNumberMaps(), 0.0f); + loadStatus.decrementMapLoad(incompleteMapTasks); + + int incompleteReduceTasks = + (int) calcEffectiveIncompleteReduceTasks( + loadStatus.getReduceCapacity(), + job.getNumberReduces(), 0.0f); + loadStatus.decrementReduceLoad(incompleteReduceTasks); + + loadStatus.decrementJobLoad(1); } catch (IOException e) { LOG.error("Error while submitting the job ", e); error = e; @@ -191,7 +205,7 @@ public void run() { } } finally { - lock.unlock(); + // do nothing } } } catch (InterruptedException e) { @@ -210,19 +224,11 @@ public void run() { */ @Override public void update(Statistics.ClusterStats item) { - lock.lock(); + ClusterStatus clusterMetrics = item.getStatus(); try { - ClusterStatus clusterMetrics = item.getStatus(); - try { - checkLoadAndGetSlotsToBackfill(item,clusterMetrics); - } catch (Exception e) { - LOG.error("Couldn't get the new Status",e); - } - if (!loadStatus.overloaded()) { - condUnderloaded.signalAll(); - } - } finally { - lock.unlock(); + checkLoadAndGetSlotsToBackfill(item, clusterMetrics); + } catch (Exception e) { + LOG.error("Couldn't get the new Status",e); } } @@ -254,18 +260,25 @@ float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity, */ private void checkLoadAndGetSlotsToBackfill( ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException { - loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks(); - loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks(); + // update the max cluster capacity incase its updated + int mapCapacity = clusterStatus.getMaxMapTasks(); + loadStatus.updateMapCapacity(mapCapacity); - loadStatus.numJobsBackfill = - (int) (maxJobTrackerRatio * clusterStatus.getTaskTrackers()) - - stats.getNumRunningJob(); - if (loadStatus.numJobsBackfill <= 0) { + int reduceCapacity = clusterStatus.getMaxReduceTasks(); + + loadStatus.updateReduceCapacity(reduceCapacity); + + int numTrackers = clusterStatus.getTaskTrackers(); + + int jobLoad = + (int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob(); + loadStatus.updateJobLoad(jobLoad); + if (loadStatus.getJobLoad() <= 0) { if (LOG.isDebugEnabled()) { - LOG.debug(System.currentTimeMillis() + " Overloaded is " + LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is " + Boolean.TRUE.toString() + " NumJobsBackfill is " - + loadStatus.numJobsBackfill); + + loadStatus.getJobLoad()); } return; // stop calculation because we know it is overloaded. } @@ -275,56 +288,84 @@ private void checkLoadAndGetSlotsToBackfill( float mapProgress = job.getJob().mapProgress(); int noOfMaps = job.getNoOfMaps(); incompleteMapTasks += - calcEffectiveIncompleteMapTasks( - clusterStatus.getMaxMapTasks(), noOfMaps, mapProgress); + calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress); } - loadStatus.mapSlotsBackfill = - (int) ((overloadMapTaskMapSlotRatio * clusterStatus.getMaxMapTasks()) - - incompleteMapTasks); - if (loadStatus.mapSlotsBackfill <= 0) { + + int mapSlotsBackFill = + (int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks); + loadStatus.updateMapLoad(mapSlotsBackFill); + + if (loadStatus.getMapLoad() <= 0) { if (LOG.isDebugEnabled()) { - LOG.debug(System.currentTimeMillis() + " Overloaded is " + LOG.debug(System.currentTimeMillis() + " [MAP-LOAD] Overloaded is " + Boolean.TRUE.toString() + " MapSlotsBackfill is " - + loadStatus.mapSlotsBackfill); + + loadStatus.getMapLoad()); } return; // stop calculation because we know it is overloaded. } float incompleteReduceTasks = 0; // include pending & running reduce tasks. for (JobStats job : ClusterStats.getRunningJobStats()) { - int noOfReduces = job.getJob().getNumReduceTasks(); + // Cached the num-reds value in JobStats + int noOfReduces = job.getNoOfReds(); if (noOfReduces > 0) { float reduceProgress = job.getJob().reduceProgress(); incompleteReduceTasks += - calcEffectiveIncompleteReduceTasks( - clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress); + calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, + reduceProgress); } } - loadStatus.reduceSlotsBackfill = - (int) ((overloadReduceTaskReduceSlotRatio * clusterStatus.getMaxReduceTasks()) + + int reduceSlotsBackFill = + (int)((overloadReduceTaskReduceSlotRatio * reduceCapacity) - incompleteReduceTasks); - if (loadStatus.reduceSlotsBackfill <= 0) { + loadStatus.updateReduceLoad(reduceSlotsBackFill); + if (loadStatus.getReduceLoad() <= 0) { if (LOG.isDebugEnabled()) { - LOG.debug(System.currentTimeMillis() + " Overloaded is " + LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is " + Boolean.TRUE.toString() + " ReduceSlotsBackfill is " - + loadStatus.reduceSlotsBackfill); + + loadStatus.getReduceLoad()); } return; // stop calculation because we know it is overloaded. } if (LOG.isDebugEnabled()) { - LOG.debug(System.currentTimeMillis() + " Overloaded is " + LOG.debug(System.currentTimeMillis() + " [OVERALL] Overloaded is " + Boolean.FALSE.toString() + "Current load Status is " + loadStatus); } } static class LoadStatus { - int mapSlotsBackfill; - int mapSlotCapacity; - int reduceSlotsBackfill; - int reduceSlotCapacity; - int numJobsBackfill; + /** + * Additional number of map slots that can be requested before + * declaring (by Gridmix STRESS mode) the cluster as overloaded. + */ + private volatile int mapSlotsBackfill; + + /** + * Determines the total map slot capacity of the cluster. + */ + private volatile int mapSlotCapacity; + + /** + * Additional number of reduce slots that can be requested before + * declaring (by Gridmix STRESS mode) the cluster as overloaded. + */ + private volatile int reduceSlotsBackfill; + + /** + * Determines the total reduce slot capacity of the cluster. + */ + private volatile int reduceSlotCapacity; + + /** + * Determines the max count of running jobs in the cluster. + */ + private volatile int numJobsBackfill; + + // set the default to true + private AtomicBoolean overloaded = new AtomicBoolean(true); /** * Construct the LoadStatus in an unknown state - assuming the cluster is @@ -339,12 +380,76 @@ static class LoadStatus { reduceSlotCapacity = -1; } - public boolean overloaded() { - return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0) - || (numJobsBackfill <= 0); + public synchronized int getMapLoad() { + return mapSlotsBackfill; } - public String toString() { + public synchronized int getMapCapacity() { + return mapSlotCapacity; + } + + public synchronized int getReduceLoad() { + return reduceSlotsBackfill; + } + + public synchronized int getReduceCapacity() { + return reduceSlotCapacity; + } + + public synchronized int getJobLoad() { + return numJobsBackfill; + } + + public synchronized void decrementMapLoad(int mapSlotsConsumed) { + this.mapSlotsBackfill -= mapSlotsConsumed; + updateOverloadStatus(); + } + + public synchronized void decrementReduceLoad(int reduceSlotsConsumed) { + this.reduceSlotsBackfill -= reduceSlotsConsumed; + updateOverloadStatus(); + } + + public synchronized void decrementJobLoad(int numJobsConsumed) { + this.numJobsBackfill -= numJobsConsumed; + updateOverloadStatus(); + } + + public synchronized void updateMapCapacity(int mapSlotsCapacity) { + this.mapSlotCapacity = mapSlotsCapacity; + updateOverloadStatus(); + } + + public synchronized void updateReduceCapacity(int reduceSlotsCapacity) { + this.reduceSlotCapacity = reduceSlotsCapacity; + updateOverloadStatus(); + } + + public synchronized void updateMapLoad(int mapSlotsBackfill) { + this.mapSlotsBackfill = mapSlotsBackfill; + updateOverloadStatus(); + } + + public synchronized void updateReduceLoad(int reduceSlotsBackfill) { + this.reduceSlotsBackfill = reduceSlotsBackfill; + updateOverloadStatus(); + } + + public synchronized void updateJobLoad(int numJobsBackfill) { + this.numJobsBackfill = numJobsBackfill; + updateOverloadStatus(); + } + + private synchronized void updateOverloadStatus() { + overloaded.set((mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0) + || (numJobsBackfill <= 0)); + } + + public synchronized boolean overloaded() { + return overloaded.get(); + } + + public synchronized String toString() { // TODO Use StringBuilder instead return " Overloaded = " + overloaded() + ", MapSlotBackfill = " + mapSlotsBackfill diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java index 802745522f..22b742678f 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java @@ -101,10 +101,17 @@ public TestMonitor(int expected, Statistics stats) { retiredJobs = new LinkedBlockingQueue(); } - public void verify(ArrayList submitted) throws Exception { + public void verify(ArrayList submitted, Configuration clientConf) + throws Exception { final ArrayList succeeded = new ArrayList(); assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded)); final HashMap sub = new HashMap(); + + // define the input and output path for the run + final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs); + final Path out = + new Path(in, clientConf.get(Gridmix.GRIDMIX_OUT_DIR, "gridmix")); + for (JobStory spec : submitted) { sub.put(spec.getJobID().toString(), spec); } @@ -115,8 +122,7 @@ public void verify(ArrayList submitted) throws Exception { Configuration conf = job.getConfiguration(); if (GenerateData.JOB_NAME.equals(jobName)) { verifyQueue(conf, jobName); - final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs); - final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs); + final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in); assertTrue("Mismatched data gen", // +/- 100k for logs (GENDATA << 20) < generated.getLength() + GENSLOP || @@ -164,7 +170,7 @@ public void verify(ArrayList submitted) throws Exception { final FileStatus stat = GridmixTestUtils.dfs.getFileStatus( - new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum))); + new Path(out, "" + Integer.valueOf(jobSeqNum))); assertEquals("Wrong owner for " + jobName, spec.getUser(), stat.getOwner()); @@ -337,8 +343,9 @@ static class DebugGridmix extends Gridmix { private JobFactory factory; private TestMonitor monitor; - public void checkMonitor() throws Exception { - monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted()); + public void checkMonitor(Configuration conf) throws Exception { + monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted(), + conf); } @Override @@ -534,9 +541,11 @@ private void doSubmission(boolean useDefaultQueue, GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777)); int res = ToolRunner.run(conf, client, argv); assertEquals("Client exited with nonzero status", 0, res); - client.checkMonitor(); + client.checkMonitor(conf); } catch (Exception e) { e.printStackTrace(); + // fail the test if there is an exception + throw new RuntimeException(e); } finally { in.getFileSystem(conf).delete(in, true); out.getFileSystem(conf).delete(out, true); diff --git a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java index 3199c42efd..64af603bec 100644 --- a/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java +++ b/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java @@ -338,7 +338,7 @@ public boolean isSuccessful() throws IOException, InterruptedException { return isSuccessful; }; }; - return new JobStats(numMaps, fakeJob); + return new JobStats(numMaps, numReds, fakeJob); } /**