MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
This commit is contained in:
parent
350b520c64
commit
ef01768333
@ -274,6 +274,8 @@ Release 2.7.0 - UNRELEASED
|
||||
MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors
|
||||
(aajisaka)
|
||||
|
||||
MAPREDUCE-6227. DFSIO for truncate. (shv via yliu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||
|
@ -31,7 +31,6 @@
|
||||
import java.util.Date;
|
||||
import java.util.Random;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -92,13 +91,13 @@ public class TestDFSIO implements Tool {
|
||||
private static final String BASE_FILE_NAME = "test_io_";
|
||||
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
|
||||
private static final long MEGA = ByteMultiple.MB.value();
|
||||
private static final int DEFAULT_NR_BYTES = 1;
|
||||
private static final int DEFAULT_NR_BYTES = 128;
|
||||
private static final int DEFAULT_NR_FILES = 4;
|
||||
private static final String USAGE =
|
||||
"Usage: " + TestDFSIO.class.getSimpleName() +
|
||||
" [genericOptions]" +
|
||||
" -read [-random | -backward | -skip [-skipSize Size]] |" +
|
||||
" -write | -append | -clean" +
|
||||
" -write | -append | -truncate | -clean" +
|
||||
" [-compression codecClassName]" +
|
||||
" [-nrFiles N]" +
|
||||
" [-size Size[B|KB|MB|GB|TB]]" +
|
||||
@ -120,7 +119,8 @@ private static enum TestType {
|
||||
TEST_TYPE_APPEND("append"),
|
||||
TEST_TYPE_READ_RANDOM("random read"),
|
||||
TEST_TYPE_READ_BACKWARD("backward read"),
|
||||
TEST_TYPE_READ_SKIP("skip read");
|
||||
TEST_TYPE_READ_SKIP("skip read"),
|
||||
TEST_TYPE_TRUNCATE("truncate");
|
||||
|
||||
private String type;
|
||||
|
||||
@ -191,6 +191,9 @@ private static Path getAppendDir(Configuration conf) {
|
||||
private static Path getRandomReadDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_random_read");
|
||||
}
|
||||
private static Path getTruncateDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_truncate");
|
||||
}
|
||||
private static Path getDataDir(Configuration conf) {
|
||||
return new Path(getBaseDir(conf), "io_data");
|
||||
}
|
||||
@ -201,6 +204,7 @@ private static Path getDataDir(Configuration conf) {
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
bench = new TestDFSIO();
|
||||
bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
cluster = new MiniDFSCluster.Builder(bench.getConf())
|
||||
.numDataNodes(2)
|
||||
.format(true)
|
||||
@ -277,6 +281,16 @@ public void testAppend() throws Exception {
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testTruncate() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES);
|
||||
long tStart = System.currentTimeMillis();
|
||||
bench.truncateTest(fs);
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void createControlFile(FileSystem fs,
|
||||
long nrBytes, // in bytes
|
||||
@ -299,9 +313,9 @@ private void createControlFile(FileSystem fs,
|
||||
} catch(Exception e) {
|
||||
throw new IOException(e.getLocalizedMessage());
|
||||
} finally {
|
||||
if (writer != null)
|
||||
if (writer != null)
|
||||
writer.close();
|
||||
writer = null;
|
||||
writer = null;
|
||||
}
|
||||
}
|
||||
LOG.info("created control files for: "+nrFiles+" files");
|
||||
@ -611,6 +625,51 @@ private void randomReadTest(FileSystem fs) throws IOException {
|
||||
runIOTest(RandomReadMapper.class, readDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Truncate mapper class.
|
||||
* The mapper truncates given file to the newLength, specified by -size.
|
||||
*/
|
||||
public static class TruncateMapper extends IOStatMapper {
|
||||
private static final long DELAY = 100L;
|
||||
|
||||
private Path filePath;
|
||||
private long fileSize;
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Closeable getIOStream(String name) throws IOException {
|
||||
filePath = new Path(getDataDir(getConf()), name);
|
||||
fileSize = fs.getFileStatus(filePath).getLen();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // IOMapperBase
|
||||
public Long doIO(Reporter reporter,
|
||||
String name,
|
||||
long newLength // in bytes
|
||||
) throws IOException {
|
||||
boolean isClosed = fs.truncate(filePath, newLength);
|
||||
reporter.setStatus("truncating " + name + " to newLength " +
|
||||
newLength + " ::host = " + hostName);
|
||||
for(int i = 0; !isClosed; i++) {
|
||||
try {
|
||||
Thread.sleep(DELAY);
|
||||
} catch (InterruptedException ignored) {}
|
||||
FileStatus status = fs.getFileStatus(filePath);
|
||||
assert status != null : "status is null";
|
||||
isClosed = (status.getLen() == newLength);
|
||||
reporter.setStatus("truncate recover for " + name + " to newLength " +
|
||||
newLength + " attempt " + i + " ::host = " + hostName);
|
||||
}
|
||||
return Long.valueOf(fileSize - newLength);
|
||||
}
|
||||
}
|
||||
|
||||
private void truncateTest(FileSystem fs) throws IOException {
|
||||
Path TruncateDir = getTruncateDir(config);
|
||||
fs.delete(TruncateDir, true);
|
||||
runIOTest(TruncateMapper.class, TruncateDir);
|
||||
}
|
||||
|
||||
private void sequentialTest(FileSystem fs,
|
||||
TestType testType,
|
||||
long fileSize, // in bytes
|
||||
@ -632,6 +691,9 @@ private void sequentialTest(FileSystem fs,
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
ioer = new RandomReadMapper();
|
||||
break;
|
||||
case TEST_TYPE_TRUNCATE:
|
||||
ioer = new TruncateMapper();
|
||||
break;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
@ -665,7 +727,7 @@ public int run(String[] args) throws IOException {
|
||||
String resFileName = DEFAULT_RES_FILE_NAME;
|
||||
String compressionClass = null;
|
||||
boolean isSequential = false;
|
||||
String version = TestDFSIO.class.getSimpleName() + ".1.7";
|
||||
String version = TestDFSIO.class.getSimpleName() + ".1.8";
|
||||
|
||||
LOG.info(version);
|
||||
if (args.length == 0) {
|
||||
@ -689,6 +751,8 @@ public int run(String[] args) throws IOException {
|
||||
} else if (args[i].equalsIgnoreCase("-skip")) {
|
||||
if (testType != TestType.TEST_TYPE_READ) return -1;
|
||||
testType = TestType.TEST_TYPE_READ_SKIP;
|
||||
} else if (args[i].equalsIgnoreCase("-truncate")) {
|
||||
testType = TestType.TEST_TYPE_TRUNCATE;
|
||||
} else if (args[i].equalsIgnoreCase("-clean")) {
|
||||
testType = TestType.TEST_TYPE_CLEANUP;
|
||||
} else if (args[i].toLowerCase().startsWith("-seq")) {
|
||||
@ -762,6 +826,11 @@ else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
|
||||
case TEST_TYPE_READ_BACKWARD:
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
randomReadTest(fs);
|
||||
break;
|
||||
case TEST_TYPE_TRUNCATE:
|
||||
truncateTest(fs);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
long execTime = System.currentTimeMillis() - tStart;
|
||||
|
||||
@ -797,7 +866,7 @@ static float toMB(long bytes) {
|
||||
return ((float)bytes)/MEGA;
|
||||
}
|
||||
|
||||
private void analyzeResult( FileSystem fs,
|
||||
private void analyzeResult( FileSystem fs,
|
||||
TestType testType,
|
||||
long execTime,
|
||||
String resFileName
|
||||
@ -870,13 +939,17 @@ private Path getReduceFilePath(TestType testType) {
|
||||
case TEST_TYPE_READ_BACKWARD:
|
||||
case TEST_TYPE_READ_SKIP:
|
||||
return new Path(getRandomReadDir(config), "part-00000");
|
||||
case TEST_TYPE_TRUNCATE:
|
||||
return new Path(getTruncateDir(config), "part-00000");
|
||||
default:
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void analyzeResult(FileSystem fs, TestType testType, long execTime)
|
||||
throws IOException {
|
||||
analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME);
|
||||
String dir = System.getProperty("test.build.dir", "target/test-dir");
|
||||
analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME);
|
||||
}
|
||||
|
||||
private void cleanup(FileSystem fs)
|
||||
|
Loading…
Reference in New Issue
Block a user