MAPREDUCE-6228. Add truncate operation to SLive. Constributed by Plamen Jeliazkov.

This commit is contained in:
Plamen Jeliazkov 2015-02-19 00:02:49 -08:00 committed by Konstantin V Shvachko
parent 946456c6d8
commit a19820f2fb
9 changed files with 255 additions and 4 deletions

View File

@ -248,6 +248,10 @@ Release 2.7.0 - UNRELEASED
NEW FEATURES
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv)
IMPROVEMENTS
MAPREDUCE-6149. Document override log4j.properties in MR job.
@ -284,8 +288,6 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
(aajisaka)
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
MAPREDUCE-6253. Update use of Iterator to Iterable. (Ray Chiang via devaraj)
MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl.

View File

@ -144,6 +144,7 @@ private Options getOptions() {
cliopt.addOption(ConfigOption.DURATION);
cliopt.addOption(ConfigOption.EXIT_ON_ERROR);
cliopt.addOption(ConfigOption.SLEEP_TIME);
cliopt.addOption(ConfigOption.TRUNCATE_WAIT);
cliopt.addOption(ConfigOption.FILES);
cliopt.addOption(ConfigOption.DIR_SIZE);
cliopt.addOption(ConfigOption.BASE_DIR);
@ -167,6 +168,7 @@ private Options getOptions() {
cliopt.addOption(ConfigOption.READ_SIZE);
cliopt.addOption(ConfigOption.WRITE_SIZE);
cliopt.addOption(ConfigOption.APPEND_SIZE);
cliopt.addOption(ConfigOption.TRUNCATE_SIZE);
cliopt.addOption(ConfigOption.RANDOM_SEED);
cliopt.addOption(ConfigOption.QUEUE_NAME);
cliopt.addOption(ConfigOption.HELP);

View File

@ -130,6 +130,32 @@ boolean shouldExitOnFirstError(String primary) {
return Boolean.parseBoolean(val);
}
/**
* @return whether the mapper or reducer should wait for truncate recovery
*/
boolean shouldWaitOnTruncate() {
return shouldWaitOnTruncate(null);
}
/**
* @param primary
* primary the initial string to be used for the value of this
* configuration option (if not provided then config and then the
* default are used)
*
* @return whether the mapper or reducer should wait for truncate recovery
*/
boolean shouldWaitOnTruncate(String primary) {
String val = primary;
if (val == null) {
val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption());
}
if (val == null) {
return ConfigOption.EXIT_ON_ERROR.getDefault();
}
return Boolean.parseBoolean(val);
}
/**
* @return the number of reducers to use
*/
@ -533,6 +559,24 @@ Range<Long> getAppendSize() {
return getAppendSize(null);
}
/**
* @param primary
* the initial string to be used for the value of this configuration
* option (if not provided then config and then the default are used)
* @return the truncate byte size range (or null if none)
*/
Range<Long> getTruncateSize(String primary) {
return getMinMaxBytes(ConfigOption.TRUNCATE_SIZE, primary);
}
/**
* @return the truncate byte size range (or null if none) using config and
* default for lookup
*/
Range<Long> getTruncateSize() {
return getTruncateSize(null);
}
/**
* @param primary
* the initial string to be used for the value of this configuration
@ -599,6 +643,21 @@ boolean shouldAppendUseBlockSize() {
return false;
}
/**
* Returns whether the truncate range should use the block size range
*
* @return true|false
*/
boolean shouldTruncateUseBlockSize() {
Range<Long> truncateRange = getTruncateSize();
if (truncateRange == null
|| (truncateRange.getLower() == truncateRange.getUpper()
&& (truncateRange.getUpper() == Long.MAX_VALUE))) {
return true;
}
return false;
}
/**
* Returns whether the read range should use the entire file
*

View File

@ -282,6 +282,18 @@ private Configuration handleOptions(ParsedOutput opts, Configuration base)
"Error extracting & merging exit on error value", e);
}
}
// overwrite the truncate wait setting
{
try {
boolean waitOnTruncate = extractor.shouldWaitOnTruncate(opts
.getValue(ConfigOption.TRUNCATE_WAIT.getOpt()));
base.setBoolean(ConfigOption.TRUNCATE_WAIT.getCfgOption(),
waitOnTruncate);
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging wait on truncate value", e);
}
}
// verify and set file limit and ensure > 0
{
Integer fileAm = null;
@ -553,6 +565,29 @@ private Configuration handleOptions(ParsedOutput opts, Configuration base)
.set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString());
}
}
// set the truncate size range
{
Range<Long> truncateSize = null;
try {
truncateSize = extractor.getTruncateSize(opts
.getValue(ConfigOption.TRUNCATE_SIZE.getOpt()));
} catch (Exception e) {
throw new ConfigException(
"Error extracting & merging truncate size range", e);
}
if (truncateSize != null) {
if (truncateSize.getLower() > truncateSize.getUpper()) {
throw new ConfigException(
"Truncate size minimum is greater than its maximum");
}
if (truncateSize.getLower() < 0) {
throw new ConfigException(
"Truncate size minimum must be greater than or equal to zero");
}
base
.set(ConfigOption.TRUNCATE_SIZE.getCfgOption(), truncateSize.toString());
}
}
// set the seed
{
Long seed = null;

View File

@ -97,6 +97,15 @@ class ConfigOption<T> extends Option {
"Min,max for size to append (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
+ ".op.append.size", null);
static final ConfigOption<Boolean> TRUNCATE_WAIT = new ConfigOption<Boolean>(
"truncateWait", true, "Should wait for truncate recovery", SLIVE_PREFIX
+ ".op.truncate.wait", true);
static final ConfigOption<Long> TRUNCATE_SIZE = new ConfigOption<Long>(
"truncateSize", true,
"Min,max for size to truncate (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
+ ".op.truncate.size", null);
static final ConfigOption<Long> RANDOM_SEED = new ConfigOption<Long>(
"seed", true, "Random number seed", SLIVE_PREFIX + ".seed", null);

View File

@ -45,7 +45,7 @@ String lowerName() {
* Allowed operation types
*/
enum OperationType {
READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE;
READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE, TRUNCATE;
String lowerName() {
return this.name().toLowerCase(Locale.ENGLISH);
}
@ -53,7 +53,7 @@ String lowerName() {
// program info
static final String PROG_NAME = SliveTest.class.getSimpleName();
static final String PROG_VERSION = "0.0.2";
static final String PROG_VERSION = "0.1.0";
// useful constants
static final int MEGABYTES = 1048576;

View File

@ -75,6 +75,9 @@ Operation getOperation(OperationType type) {
case CREATE:
op = new CreateOp(this.config, rnd);
break;
case TRUNCATE:
op = new TruncateOp(this.config, rnd);
break;
}
typedOperations.put(type, op);
return op;

View File

@ -136,6 +136,8 @@ private String[] getTestArgs(boolean sleep) {
args.add("10");
args.add("-" + ConfigOption.FILES.getOpt());
args.add("10");
args.add("-" + ConfigOption.TRUNCATE_SIZE.getOpt());
args.add("0,1M");
}
return args.toArray(new String[args.size()]);
}
@ -237,6 +239,9 @@ public void testArguments() throws Exception {
Range<Long> wRange = extractor.getWriteSize();
assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1);
assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2);
Range<Long> trRange = extractor.getTruncateSize();
assertEquals(trRange.getLower().intValue(), 0);
assertEquals(trRange.getUpper().intValue(), Constants.MEGABYTES * 1);
Range<Long> bRange = extractor.getBlockSize();
assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1);
assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2);
@ -534,4 +539,26 @@ protected Path getAppendFile() {
};
runOperationOk(extractor, aop, false);
}
@Test
public void testTruncateOp() throws Exception {
// setup a valid config
ConfigExtractor extractor = getTestConfig(false);
// ensure file created before append
final Path fn = new Path(getTestFile().getCanonicalPath());
CreateOp op = new CreateOp(extractor, rnd) {
protected Path getCreateFile() {
return fn;
}
};
runOperationOk(extractor, op, true);
// local file system (ChecksumFileSystem) currently doesn't support truncate -
// but we'll leave this test here anyways but can't check the results..
TruncateOp top = new TruncateOp(extractor, rnd) {
protected Path getTruncateFile() {
return fn;
}
};
runOperationOk(extractor, top, false);
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.slive;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
/**
* Operation which selects a random file and truncates a random amount of bytes
* (selected from the configuration for truncate size) from that file,
* if it exists.
*
* This operation will capture statistics on success for bytes written, time
* taken (milliseconds), and success count and on failure it will capture the
* number of failures and the time taken (milliseconds) to fail.
*/
class TruncateOp extends Operation {
private static final Log LOG = LogFactory.getLog(TruncateOp.class);
TruncateOp(ConfigExtractor cfg, Random rnd) {
super(TruncateOp.class.getSimpleName(), cfg, rnd);
}
/**
* Gets the file to truncate from
*
* @return Path
*/
protected Path getTruncateFile() {
Path fn = getFinder().getFile();
return fn;
}
@Override // Operation
List<OperationOutput> run(FileSystem fs) {
List<OperationOutput> out = super.run(fs);
try {
Path fn = getTruncateFile();
boolean waitOnTruncate = getConfig().shouldWaitOnTruncate();
long currentSize = fs.getFileStatus(fn).getLen();
// determine file status for file length requirement
// to know if should fill in partial bytes
Range<Long> truncateSizeRange = getConfig().getTruncateSize();
if (getConfig().shouldTruncateUseBlockSize()) {
truncateSizeRange = getConfig().getBlockSize();
}
long truncateSize = Math.max(0L,
currentSize - Range.betweenPositive(getRandom(), truncateSizeRange));
long timeTaken = 0;
LOG.info("Attempting to truncate file at " + fn + " to size "
+ Helper.toByteInfo(truncateSize));
{
// truncate
long startTime = Timer.now();
boolean completed = fs.truncate(fn, truncateSize);
if(!completed && waitOnTruncate)
waitForRecovery(fs, fn, truncateSize);
timeTaken += Timer.elapsed(startTime);
}
out.add(new OperationOutput(OutputType.LONG, getType(),
ReportWriter.BYTES_WRITTEN, 0));
out.add(new OperationOutput(OutputType.LONG, getType(),
ReportWriter.OK_TIME_TAKEN, timeTaken));
out.add(new OperationOutput(OutputType.LONG, getType(),
ReportWriter.SUCCESSES, 1L));
LOG.info("Truncate file " + fn + " to " + Helper.toByteInfo(truncateSize)
+ " in " + timeTaken + " milliseconds");
} catch (FileNotFoundException e) {
out.add(new OperationOutput(OutputType.LONG, getType(),
ReportWriter.NOT_FOUND, 1L));
LOG.warn("Error with truncating", e);
} catch (IOException e) {
out.add(new OperationOutput(OutputType.LONG, getType(),
ReportWriter.FAILURES, 1L));
LOG.warn("Error with truncating", e);
}
return out;
}
private void waitForRecovery(FileSystem fs, Path fn, long newLength)
throws IOException {
LOG.info("Waiting on truncate file recovery for " + fn);
for(;;) {
FileStatus stat = fs.getFileStatus(fn);
if(stat.getLen() == newLength) break;
try {Thread.sleep(1000);} catch(InterruptedException ignored) {}
}
}
}