HADOOP-17628. Distcp contract test is really slow with ABFS and S3A; timing out. (#3240)
This patch cuts down the size of directory trees used for distcp contract tests against object stores, so making them much faster against distant/slow stores. On abfs, the test only runs with -Dscale (as was the case for s3a already), and has the larger scale test timeout. After every test case, the FileSystem IOStatistics are logged, to provide information about what IO is taking place and what it's performance is. There are some test cases which upload files of 1+ MiB; you can increase the size of the upload in the option "scale.test.distcp.file.size.kb" Set it to zero and the large file tests are skipped. Contributed by Steve Loughran.
This commit is contained in:
parent
efb3fa2bf5
commit
ee466d4b40
@ -943,8 +943,8 @@ public static List<Path> createFiles(final FileSystem fs,
|
|||||||
final int fileCount,
|
final int fileCount,
|
||||||
final int dirCount) throws IOException {
|
final int dirCount) throws IOException {
|
||||||
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
|
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
|
||||||
new ArrayList<Path>(fileCount),
|
new ArrayList<>(fileCount),
|
||||||
new ArrayList<Path>(dirCount));
|
new ArrayList<>(dirCount));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -18,17 +18,12 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.contract.s3a;
|
package org.apache.hadoop.fs.contract.s3a;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.StorageStatistics;
|
import org.apache.hadoop.fs.StorageStatistics;
|
||||||
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
|
|
||||||
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
|
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -59,42 +54,29 @@ protected Configuration createConfiguration() {
|
|||||||
return newConf;
|
return newConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean shouldUseDirectWrite() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected S3AContract createContract(Configuration conf) {
|
protected S3AContract createContract(Configuration conf) {
|
||||||
return new S3AContract(conf);
|
return new S3AContract(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Always inject the delay path in, so if the destination is inconsistent,
|
|
||||||
* and uses this key, inconsistency triggered.
|
|
||||||
* @param filepath path string in
|
|
||||||
* @return path on the remote FS for distcp
|
|
||||||
* @throws IOException IO failure
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
protected Path path(final String filepath) throws IOException {
|
public void testDistCpWithIterator() throws Exception {
|
||||||
Path path = super.path(filepath);
|
|
||||||
return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void testDirectWrite() throws Exception {
|
|
||||||
final long renames = getRenameOperationCount();
|
final long renames = getRenameOperationCount();
|
||||||
super.testDirectWrite();
|
super.testDistCpWithIterator();
|
||||||
assertEquals("Expected no renames for a direct write distcp", 0L,
|
assertEquals("Expected no renames for a direct write distcp",
|
||||||
getRenameOperationCount() - renames);
|
getRenameOperationCount(),
|
||||||
|
renames);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void testNonDirectWrite() throws Exception {
|
public void testNonDirectWrite() throws Exception {
|
||||||
final long renames = getRenameOperationCount();
|
final long renames = getRenameOperationCount();
|
||||||
try {
|
super.testNonDirectWrite();
|
||||||
super.testNonDirectWrite();
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
// We may get this exception when data is written to a DELAY_LISTING_ME
|
|
||||||
// directory causing verification of the distcp success to fail if
|
|
||||||
// S3Guard is not enabled
|
|
||||||
}
|
|
||||||
assertEquals("Expected 2 renames for a non-direct write distcp", 2L,
|
assertEquals("Expected 2 renames for a non-direct write distcp", 2L,
|
||||||
getRenameOperationCount() - renames);
|
getRenameOperationCount() - renames);
|
||||||
}
|
}
|
||||||
|
@ -19,16 +19,24 @@
|
|||||||
package org.apache.hadoop.fs.azurebfs.contract;
|
package org.apache.hadoop.fs.azurebfs.contract;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||||
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
|
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contract test for distCp operation.
|
* Contract test for distCp operation.
|
||||||
*/
|
*/
|
||||||
public class ITestAbfsFileSystemContractDistCp extends AbstractContractDistCpTest {
|
public class ITestAbfsFileSystemContractDistCp extends AbstractContractDistCpTest {
|
||||||
private final ABFSContractTestBinding binding;
|
private final ABFSContractTestBinding binding;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getTestTimeoutMillis() {
|
||||||
|
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
||||||
|
}
|
||||||
|
|
||||||
public ITestAbfsFileSystemContractDistCp() throws Exception {
|
public ITestAbfsFileSystemContractDistCp() throws Exception {
|
||||||
binding = new ABFSContractTestBinding();
|
binding = new ABFSContractTestBinding();
|
||||||
Assume.assumeTrue(binding.getAuthType() != AuthType.OAuth);
|
Assume.assumeTrue(binding.getAuthType() != AuthType.OAuth);
|
||||||
@ -38,6 +46,7 @@ public ITestAbfsFileSystemContractDistCp() throws Exception {
|
|||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
binding.setup();
|
binding.setup();
|
||||||
super.setup();
|
super.setup();
|
||||||
|
assumeScaleTestsEnabled(binding.getRawConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,49 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs.contract;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Contract test for secure distCP operation.
|
|
||||||
*/
|
|
||||||
public class ITestAbfsFileSystemContractSecureDistCp extends AbstractContractDistCpTest {
|
|
||||||
private final ABFSContractTestBinding binding;
|
|
||||||
|
|
||||||
public ITestAbfsFileSystemContractSecureDistCp() throws Exception {
|
|
||||||
binding = new ABFSContractTestBinding();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setup() throws Exception {
|
|
||||||
binding.setup();
|
|
||||||
super.setup();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Configuration createConfiguration() {
|
|
||||||
return binding.getRawConfiguration();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected AbfsFileSystemContract createContract(Configuration conf) {
|
|
||||||
return new AbfsFileSystemContract(conf, true);
|
|
||||||
}
|
|
||||||
}
|
|
@ -18,7 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.tools.contract;
|
package org.apache.hadoop.tools.contract;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
|
||||||
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;
|
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -77,6 +79,22 @@ public abstract class AbstractContractDistCpTest
|
|||||||
|
|
||||||
protected static final int MB = 1024 * 1024;
|
protected static final int MB = 1024 * 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default depth for a directory tree: {@value}.
|
||||||
|
*/
|
||||||
|
protected static final int DEFAULT_DEPTH = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default width for a directory tree: {@value}.
|
||||||
|
* Total dir size is
|
||||||
|
* <pre>
|
||||||
|
* DEFAULT_WITH^DEFAULT_DEPTH
|
||||||
|
* </pre>
|
||||||
|
* So the duration of a test run grows rapidly with this value.
|
||||||
|
* This has very significant consequences for object storage runs.
|
||||||
|
*/
|
||||||
|
protected static final int DEFAULT_WIDTH = 2;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName testName = new TestName();
|
public TestName testName = new TestName();
|
||||||
|
|
||||||
@ -154,13 +172,20 @@ public void setup() throws Exception {
|
|||||||
localDir =
|
localDir =
|
||||||
localFS.makeQualified(new Path(new Path(
|
localFS.makeQualified(new Path(new Path(
|
||||||
GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
|
GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
|
||||||
|
localFS.delete(localDir, true);
|
||||||
mkdirs(localFS, localDir);
|
mkdirs(localFS, localDir);
|
||||||
remoteDir = path(testSubDir + "/remote");
|
Path testSubPath = path(testSubDir);
|
||||||
mkdirs(remoteFS, remoteDir);
|
remoteDir = new Path(testSubPath, "remote");
|
||||||
// test teardown does this, but IDE-based test debugging can skip
|
// test teardown does this, but IDE-based test debugging can skip
|
||||||
// that teardown; this guarantees the initial state is clean
|
// that teardown; this guarantees the initial state is clean
|
||||||
remoteFS.delete(remoteDir, true);
|
remoteFS.delete(remoteDir, true);
|
||||||
localFS.delete(localDir, true);
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
// if remote FS supports IOStatistics log it.
|
||||||
|
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, getRemoteFS());
|
||||||
|
super.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -325,6 +350,7 @@ private Job distCpUpdate(final Path srcDir, final Path destDir)
|
|||||||
.withDeleteMissing(true)
|
.withDeleteMissing(true)
|
||||||
.withSyncFolder(true)
|
.withSyncFolder(true)
|
||||||
.withCRC(true)
|
.withCRC(true)
|
||||||
|
.withDirectWrite(shouldUseDirectWrite())
|
||||||
.withOverwrite(false)));
|
.withOverwrite(false)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -378,6 +404,7 @@ public void testTrackDeepDirectoryStructureToRemote() throws Exception {
|
|||||||
inputDirUnderOutputDir)
|
inputDirUnderOutputDir)
|
||||||
.withTrackMissing(trackDir)
|
.withTrackMissing(trackDir)
|
||||||
.withSyncFolder(true)
|
.withSyncFolder(true)
|
||||||
|
.withDirectWrite(shouldUseDirectWrite())
|
||||||
.withOverwrite(false)));
|
.withOverwrite(false)));
|
||||||
|
|
||||||
lsR("tracked udpate", remoteFS, destDir);
|
lsR("tracked udpate", remoteFS, destDir);
|
||||||
@ -476,7 +503,7 @@ public void testSetJobId() throws Exception {
|
|||||||
remoteFS.create(new Path(remoteDir, "file1")).close();
|
remoteFS.create(new Path(remoteDir, "file1")).close();
|
||||||
DistCpTestUtils
|
DistCpTestUtils
|
||||||
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
|
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
|
||||||
localDir.toString(), null, conf);
|
localDir.toString(), getDefaultCLIOptionsOrNull(), conf);
|
||||||
assertNotNull("DistCp job id isn't set",
|
assertNotNull("DistCp job id isn't set",
|
||||||
conf.get(CONF_LABEL_DISTCP_JOB_ID));
|
conf.get(CONF_LABEL_DISTCP_JOB_ID));
|
||||||
}
|
}
|
||||||
@ -532,13 +559,15 @@ private Path distCpDeepDirectoryStructure(FileSystem srcFS,
|
|||||||
*/
|
*/
|
||||||
private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
|
private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
|
||||||
Path dstDir) throws Exception {
|
Path dstDir) throws Exception {
|
||||||
|
int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
|
||||||
|
getDefaultDistCPSizeKb());
|
||||||
|
if (fileSizeKb < 1) {
|
||||||
|
skip("File size in " + SCALE_TEST_DISTCP_FILE_SIZE_KB + " is zero");
|
||||||
|
}
|
||||||
initPathFields(srcDir, dstDir);
|
initPathFields(srcDir, dstDir);
|
||||||
Path largeFile1 = new Path(inputDir, "file1");
|
Path largeFile1 = new Path(inputDir, "file1");
|
||||||
Path largeFile2 = new Path(inputDir, "file2");
|
Path largeFile2 = new Path(inputDir, "file2");
|
||||||
Path largeFile3 = new Path(inputDir, "file3");
|
Path largeFile3 = new Path(inputDir, "file3");
|
||||||
mkdirs(srcFS, inputDir);
|
|
||||||
int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
|
|
||||||
DEFAULT_DISTCP_SIZE_KB);
|
|
||||||
int fileSizeMb = fileSizeKb / 1024;
|
int fileSizeMb = fileSizeKb / 1024;
|
||||||
getLogger().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
|
getLogger().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
|
||||||
byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
|
byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
|
||||||
@ -549,22 +578,37 @@ private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
|
|||||||
createFile(srcFS, largeFile3, true, data3);
|
createFile(srcFS, largeFile3, true, data3);
|
||||||
Path target = new Path(dstDir, "outputDir");
|
Path target = new Path(dstDir, "outputDir");
|
||||||
runDistCp(inputDir, target);
|
runDistCp(inputDir, target);
|
||||||
ContractTestUtils.assertIsDirectory(dstFS, target);
|
|
||||||
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
|
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
|
||||||
verifyFileContents(dstFS, new Path(target, "inputDir/file2"), data2);
|
verifyFileContents(dstFS, new Path(target, "inputDir/file2"), data2);
|
||||||
verifyFileContents(dstFS, new Path(target, "inputDir/file3"), data3);
|
verifyFileContents(dstFS, new Path(target, "inputDir/file3"), data3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override point. What is the default distcp size
|
||||||
|
* for large files if not overridden by
|
||||||
|
* {@link #SCALE_TEST_DISTCP_FILE_SIZE_KB}.
|
||||||
|
* If 0 then, unless overridden in the configuration,
|
||||||
|
* the large file tests will not run.
|
||||||
|
* @return file size.
|
||||||
|
*/
|
||||||
|
protected int getDefaultDistCPSizeKb() {
|
||||||
|
return DEFAULT_DISTCP_SIZE_KB;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes DistCp and asserts that the job finished successfully.
|
* Executes DistCp and asserts that the job finished successfully.
|
||||||
*
|
* The choice of direct/indirect is based on the value of
|
||||||
|
* {@link #shouldUseDirectWrite()}.
|
||||||
* @param src source path
|
* @param src source path
|
||||||
* @param dst destination path
|
* @param dst destination path
|
||||||
* @throws Exception if there is a failure
|
* @throws Exception if there is a failure
|
||||||
*/
|
*/
|
||||||
private void runDistCp(Path src, Path dst) throws Exception {
|
private void runDistCp(Path src, Path dst) throws Exception {
|
||||||
runDistCp(buildWithStandardOptions(
|
if (shouldUseDirectWrite()) {
|
||||||
new DistCpOptions.Builder(Collections.singletonList(src), dst)));
|
runDistCpDirectWrite(src, dst);
|
||||||
|
} else {
|
||||||
|
runDistCpWithRename(src, dst);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -607,6 +651,9 @@ private static void mkdirs(FileSystem fs, Path dir) throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testDirectWrite() throws Exception {
|
public void testDirectWrite() throws Exception {
|
||||||
describe("copy file from local to remote using direct write option");
|
describe("copy file from local to remote using direct write option");
|
||||||
|
if (shouldUseDirectWrite()) {
|
||||||
|
skip("not needed as all other tests use the -direct option.");
|
||||||
|
}
|
||||||
directWrite(localFS, localDir, remoteFS, remoteDir, true);
|
directWrite(localFS, localDir, remoteFS, remoteDir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -623,8 +670,6 @@ public void testDistCpWithIterator() throws Exception {
|
|||||||
Path source = new Path(remoteDir, "src");
|
Path source = new Path(remoteDir, "src");
|
||||||
Path dest = new Path(localDir, "dest");
|
Path dest = new Path(localDir, "dest");
|
||||||
dest = localFS.makeQualified(dest);
|
dest = localFS.makeQualified(dest);
|
||||||
mkdirs(remoteFS, source);
|
|
||||||
verifyPathExists(remoteFS, "", source);
|
|
||||||
|
|
||||||
GenericTestUtils
|
GenericTestUtils
|
||||||
.createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
|
.createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
|
||||||
@ -632,8 +677,9 @@ public void testDistCpWithIterator() throws Exception {
|
|||||||
GenericTestUtils.LogCapturer log =
|
GenericTestUtils.LogCapturer log =
|
||||||
GenericTestUtils.LogCapturer.captureLogs(SimpleCopyListing.LOG);
|
GenericTestUtils.LogCapturer.captureLogs(SimpleCopyListing.LOG);
|
||||||
|
|
||||||
|
String options = "-useiterator -update -delete" + getDefaultCLIOptions();
|
||||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
||||||
dest.toString(), "-useiterator -update -delete", conf);
|
dest.toString(), options, conf);
|
||||||
|
|
||||||
// Check the target listing was also done using iterator.
|
// Check the target listing was also done using iterator.
|
||||||
Assertions.assertThat(log.getOutput()).contains(
|
Assertions.assertThat(log.getOutput()).contains(
|
||||||
@ -644,11 +690,11 @@ public void testDistCpWithIterator() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int getDepth() {
|
public int getDepth() {
|
||||||
return 3;
|
return DEFAULT_DEPTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getWidth() {
|
public int getWidth() {
|
||||||
return 10;
|
return DEFAULT_WIDTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getTotalFiles() {
|
private int getTotalFiles() {
|
||||||
@ -659,6 +705,41 @@ private int getTotalFiles() {
|
|||||||
return totalFiles;
|
return totalFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override point: should direct write always be used?
|
||||||
|
* false by default; enable for stores where rename is slow.
|
||||||
|
* @return true if direct write should be used in all tests.
|
||||||
|
*/
|
||||||
|
protected boolean shouldUseDirectWrite() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the default options for distcp, including,
|
||||||
|
* if {@link #shouldUseDirectWrite()} is true,
|
||||||
|
* the -direct option.
|
||||||
|
* Append or prepend this to string CLIs.
|
||||||
|
* @return default options.
|
||||||
|
*/
|
||||||
|
protected String getDefaultCLIOptions() {
|
||||||
|
return shouldUseDirectWrite()
|
||||||
|
? " -direct "
|
||||||
|
: "";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the default options for distcp, including,
|
||||||
|
* if {@link #shouldUseDirectWrite()} is true,
|
||||||
|
* the -direct option, null if there are no
|
||||||
|
* defaults.
|
||||||
|
* @return default options.
|
||||||
|
*/
|
||||||
|
protected String getDefaultCLIOptionsOrNull() {
|
||||||
|
return shouldUseDirectWrite()
|
||||||
|
? " -direct "
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes a test with support for using direct write option.
|
* Executes a test with support for using direct write option.
|
||||||
*
|
*
|
||||||
@ -683,7 +764,7 @@ private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
|
|||||||
if (directWrite) {
|
if (directWrite) {
|
||||||
runDistCpDirectWrite(inputDir, target);
|
runDistCpDirectWrite(inputDir, target);
|
||||||
} else {
|
} else {
|
||||||
runDistCp(inputDir, target);
|
runDistCpWithRename(inputDir, target);
|
||||||
}
|
}
|
||||||
ContractTestUtils.assertIsDirectory(dstFS, target);
|
ContractTestUtils.assertIsDirectory(dstFS, target);
|
||||||
lsR("Destination tree after distcp", dstFS, target);
|
lsR("Destination tree after distcp", dstFS, target);
|
||||||
@ -709,6 +790,21 @@ private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
|
|||||||
Collections.singletonList(srcDir), destDir)
|
Collections.singletonList(srcDir), destDir)
|
||||||
.withDirectWrite(true)));
|
.withDirectWrite(true)));
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Run distcp srcDir destDir.
|
||||||
|
* @param srcDir local source directory
|
||||||
|
* @param destDir remote destination directory
|
||||||
|
* @return the completed job
|
||||||
|
* @throws Exception any failure.
|
||||||
|
*/
|
||||||
|
private Job runDistCpWithRename(Path srcDir, final Path destDir)
|
||||||
|
throws Exception {
|
||||||
|
describe("\nDistcp from " + srcDir + " to " + destDir);
|
||||||
|
return runDistCp(buildWithStandardOptions(
|
||||||
|
new DistCpOptions.Builder(
|
||||||
|
Collections.singletonList(srcDir), destDir)
|
||||||
|
.withDirectWrite(false)));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDistCpWithFile() throws Exception {
|
public void testDistCpWithFile() throws Exception {
|
||||||
@ -718,7 +814,6 @@ public void testDistCpWithFile() throws Exception {
|
|||||||
Path dest = new Path(localDir, "file");
|
Path dest = new Path(localDir, "file");
|
||||||
dest = localFS.makeQualified(dest);
|
dest = localFS.makeQualified(dest);
|
||||||
|
|
||||||
mkdirs(remoteFS, remoteDir);
|
|
||||||
mkdirs(localFS, localDir);
|
mkdirs(localFS, localDir);
|
||||||
|
|
||||||
int len = 4;
|
int len = 4;
|
||||||
@ -729,7 +824,7 @@ public void testDistCpWithFile() throws Exception {
|
|||||||
verifyPathExists(localFS, "", localDir);
|
verifyPathExists(localFS, "", localDir);
|
||||||
|
|
||||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
||||||
dest.toString(), null, conf);
|
dest.toString(), getDefaultCLIOptionsOrNull(), conf);
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
|
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
|
||||||
@ -739,15 +834,12 @@ public void testDistCpWithFile() throws Exception {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDistCpWithUpdateExistFile() throws Exception {
|
public void testDistCpWithUpdateExistFile() throws Exception {
|
||||||
describe("Now update an exist file.");
|
describe("Now update an existing file.");
|
||||||
|
|
||||||
Path source = new Path(remoteDir, "file");
|
Path source = new Path(remoteDir, "file");
|
||||||
Path dest = new Path(localDir, "file");
|
Path dest = new Path(localDir, "file");
|
||||||
dest = localFS.makeQualified(dest);
|
dest = localFS.makeQualified(dest);
|
||||||
|
|
||||||
mkdirs(remoteFS, remoteDir);
|
|
||||||
mkdirs(localFS, localDir);
|
|
||||||
|
|
||||||
int len = 4;
|
int len = 4;
|
||||||
int base = 0x40;
|
int base = 0x40;
|
||||||
byte[] block = dataset(len, base, base + len);
|
byte[] block = dataset(len, base, base + len);
|
||||||
@ -758,7 +850,7 @@ public void testDistCpWithUpdateExistFile() throws Exception {
|
|||||||
verifyPathExists(remoteFS, "", source);
|
verifyPathExists(remoteFS, "", source);
|
||||||
verifyPathExists(localFS, "", dest);
|
verifyPathExists(localFS, "", dest);
|
||||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
|
||||||
dest.toString(), "-delete -update", conf);
|
dest.toString(), "-delete -update" + getDefaultCLIOptions(), conf);
|
||||||
|
|
||||||
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
|
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
|
||||||
.hasSize(1);
|
.hasSize(1);
|
||||||
|
@ -30,8 +30,9 @@
|
|||||||
* Verifies that the HDFS passes all the tests in
|
* Verifies that the HDFS passes all the tests in
|
||||||
* {@link AbstractContractDistCpTest}.
|
* {@link AbstractContractDistCpTest}.
|
||||||
* As such, it acts as an in-module validation of this contract test itself.
|
* As such, it acts as an in-module validation of this contract test itself.
|
||||||
|
* It does skip the large file test cases for speed.
|
||||||
*/
|
*/
|
||||||
public class OptionalTestHDFSContractDistCp extends AbstractContractDistCpTest {
|
public class TestHDFSContractDistCp extends AbstractContractDistCpTest {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void createCluster() throws IOException {
|
public static void createCluster() throws IOException {
|
||||||
@ -47,4 +48,14 @@ public static void teardownCluster() throws IOException {
|
|||||||
protected AbstractFSContract createContract(Configuration conf) {
|
protected AbstractFSContract createContract(Configuration conf) {
|
||||||
return new HDFSContract(conf);
|
return new HDFSContract(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Turn off the large file tests as they are very slow and there
|
||||||
|
* are many other distcp to HDFS tests which verify such things.
|
||||||
|
* @return 0
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected int getDefaultDistCPSizeKb() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user