HDFS-17381. Distcp of EC files should not be limited to DFS. (#6551)

Contributed by Sadanand Shenoy
This commit is contained in:
Sadanand Shenoy 2024-09-25 22:24:09 +05:30 committed by GitHub
parent 21ec686be3
commit 49a495803a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 271 additions and 36 deletions

View File

@ -2108,4 +2108,23 @@ public class FileUtil {
LOG.info("Ignoring missing directory {}", path);
LOG.debug("Directory missing", e);
}
/**
* Return true if the FS implements {@link WithErasureCoding} and
* supports EC_POLICY option in {@link Options.OpenFileOptions}.
* A message is logged when the filesystem does not support Erasure coding.
* @param fs filesystem
* @param path path
* @return true if the Filesystem supports EC
* @throws IOException if there is a failure in hasPathCapability call
*/
public static boolean checkFSSupportsEC(FileSystem fs, Path path) throws IOException {
if (fs instanceof WithErasureCoding &&
fs.hasPathCapability(path, Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY)) {
return true;
}
LOG.warn("Filesystem with scheme {} does not support Erasure Coding" +
" at path {}", fs.getScheme(), path);
return false;
}
}

View File

@ -704,5 +704,10 @@ public final class Options {
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.collect(Collectors.toSet()));
/**
* EC policy to be set on the file that needs to be created : {@value}.
*/
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
/**
* Filesystems that support EC can implement this interface.
*/
public interface WithErasureCoding {
/**
* Get the EC Policy name of the given file's fileStatus.
* If the file is not erasure coded, this shall return null.
* Callers will make sure to check if fileStatus isInstance of
* an FS that implements this interface.
* If the call fails due to some error, this shall return null.
* @param fileStatus object of the file whose ecPolicy needs to be obtained.
* @return the ec Policy name
*/
String getErasureCodingPolicyName(FileStatus fileStatus);
/**
* Set the given ecPolicy on the path.
* The path and ecPolicyName should be valid (not null/empty, the
* implementing FS shall support the supplied ecPolicy).
* implementations can throw IOException if these conditions are not met.
* @param path on which the EC policy needs to be set.
* @param ecPolicyName the EC policy.
* @throws IOException if there is an error during the set op.
*/
void setErasureCodingPolicy(Path path, String ecPolicyName) throws
IOException;
}

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -146,7 +147,8 @@ import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapa
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode {
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode,
WithErasureCoding {
private Path workingDir;
private URI uri;
@ -376,6 +378,14 @@ public class DistributedFileSystem extends FileSystem
return dfs.createWrappedInputStream(dfsis);
}
@Override
public String getErasureCodingPolicyName(FileStatus fileStatus) {
if (!(fileStatus instanceof HdfsFileStatus)) {
return null;
}
return ((HdfsFileStatus) fileStatus).getErasureCodingPolicy().getName();
}
/**
* Create a handle to an HDFS file.
* @param st HdfsFileStatus instance from NameNode
@ -3862,6 +3872,10 @@ public class DistributedFileSystem extends FileSystem
*/
@Override
public FSDataOutputStream build() throws IOException {
String ecPolicy = getOptions().get(Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY, "");
if (!ecPolicy.isEmpty()) {
ecPolicyName(ecPolicy);
}
if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.client;
import java.util.Optional;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -54,6 +55,7 @@ public final class DfsPathCapabilities {
case CommonPathCapabilities.FS_STORAGEPOLICY:
case CommonPathCapabilities.FS_XATTRS:
case CommonPathCapabilities.FS_TRUNCATE:
case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY:
return Optional.of(true);
case CommonPathCapabilities.FS_SYMLINKS:
return Optional.of(FileSystem.areSymlinksEnabled());

View File

@ -205,7 +205,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
}
if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context, sourceStatus);
createTargetDirsWithRetry(description, target, context, sourceStatus,
sourceFS);
return;
}
@ -295,10 +296,10 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
}
private void createTargetDirsWithRetry(String description, Path target,
Context context, FileStatus sourceStatus) throws IOException {
Context context, FileStatus sourceStatus, FileSystem sourceFS) throws IOException {
try {
new RetriableDirectoryCreateCommand(description).execute(target,
context, sourceStatus);
new RetriableDirectoryCreateCommand(description).execute(target, context,
sourceStatus, sourceFS);
} catch (Exception e) {
throw new IOException("mkdir failed for " + target, e);
}

View File

@ -18,16 +18,20 @@
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
/**
@ -36,6 +40,9 @@ import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings
*/
public class RetriableDirectoryCreateCommand extends RetriableCommand {
private static final Logger LOG =
LoggerFactory.getLogger(RetriableDirectoryCreateCommand.class);
/**
* Constructor, taking a description of the action.
* @param description Verbose description of the copy operation.
@ -53,10 +60,11 @@ public class RetriableDirectoryCreateCommand extends RetriableCommand {
*/
@Override
protected Object doExecute(Object... arguments) throws Exception {
assert arguments.length == 3 : "Unexpected argument list.";
assert arguments.length == 4 : "Unexpected argument list.";
Path target = (Path)arguments[0];
Mapper.Context context = (Mapper.Context)arguments[1];
FileStatus sourceStatus = (FileStatus)arguments[2];
FileSystem sourceFs = (FileSystem)arguments[3];
FileSystem targetFS = target.getFileSystem(context.getConfiguration());
if(!targetFS.mkdirs(target)) {
@ -66,11 +74,16 @@ public class RetriableDirectoryCreateCommand extends RetriableCommand {
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
if (preserveEC && sourceStatus.isErasureCoded()
&& targetFS instanceof DistributedFileSystem) {
ErasureCodingPolicy ecPolicy =
((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
dfs.setErasureCodingPolicy(target, ecPolicy.getName());
&& checkFSSupportsEC(sourceFs, sourceStatus.getPath())
&& checkFSSupportsEC(targetFS, target)) {
ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByName(
((WithErasureCoding) sourceFs).getErasureCodingPolicyName(
sourceStatus));
LOG.debug("EC Policy for source path is {}", ecPolicy);
WithErasureCoding ecFs = (WithErasureCoding) targetFS;
if (ecPolicy != null) {
ecFs.setErasureCodingPolicy(target, ecPolicy.getName());
}
}
return true;
}

View File

@ -24,9 +24,6 @@ import java.io.OutputStream;
import java.util.EnumSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,9 +33,11 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.CopyListingFileStatus;
@ -52,8 +51,10 @@ import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY;
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
@ -151,8 +152,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
long offset = (action == FileAction.APPEND) ?
targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
long bytesRead = copyToFile(targetPath, targetFS, source,
offset, context, fileAttributes, sourceChecksum, sourceStatus);
long bytesRead = copyToFile(targetPath, targetFS, source, offset, context,
fileAttributes, sourceChecksum, sourceStatus, sourceFS);
if (!source.isSplit()) {
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
@ -195,7 +196,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private long copyToFile(Path targetPath, FileSystem targetFS,
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum,
FileStatus sourceStatus)
FileStatus sourceStatus,FileSystem sourceFS)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
@ -205,11 +206,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
boolean preserveEC = getFileAttributeSettings(context)
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
ErasureCodingPolicy ecPolicy = null;
String ecPolicyName = null;
if (preserveEC && sourceStatus.isErasureCoded()
&& sourceStatus instanceof HdfsFileStatus
&& targetFS instanceof DistributedFileSystem) {
ecPolicy = ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
&& checkFSSupportsEC(sourceFS, sourceStatus.getPath())
&& checkFSSupportsEC(targetFS, targetPath)) {
ecPolicyName = ((WithErasureCoding) sourceFS).getErasureCodingPolicyName(sourceStatus);
}
final OutputStream outStream;
if (action == FileAction.OVERWRITE) {
@ -222,21 +223,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
targetFS, targetPath);
FSDataOutputStream out;
ChecksumOpt checksumOpt = getChecksumOpt(fileAttributes, sourceChecksum);
if (!preserveEC || ecPolicy == null) {
if (!preserveEC || ecPolicyName == null) {
out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), copyBufferSize,
repl, blockSize, context, checksumOpt);
} else {
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
dfs.createFile(targetPath).permission(permission).create()
.overwrite(true).bufferSize(copyBufferSize).replication(repl)
.blockSize(blockSize).progress(context).recursive()
.ecPolicyName(ecPolicy.getName());
if (checksumOpt != null) {
builder.checksumOpt(checksumOpt);
}
out = builder.build();
FSDataOutputStreamBuilder builder = targetFS.createFile(targetPath)
.permission(permission)
.overwrite(true)
.bufferSize(copyBufferSize)
.replication(repl)
.blockSize(blockSize)
.progress(context)
.recursive();
builder.opt(FS_OPTION_OPENFILE_EC_POLICY, ecPolicyName);
out = builder.build();
}
outStream = new BufferedOutputStream(out);
} else {

View File

@ -18,12 +18,21 @@
package org.apache.hadoop.tools;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.WithErasureCoding;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -41,8 +50,10 @@ import org.junit.Test;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@ -68,12 +79,17 @@ public class TestDistCpWithRawXAttrs {
private static final String rootedSrcName = "/src";
private static final String rawDestName = "/.reserved/raw/dest";
private static final String rawSrcName = "/.reserved/raw/src";
private static final File base =
GenericTestUtils.getTestDir("work-dir/localfs");
private static final String TEST_ROOT_DIR = base.getAbsolutePath();
@BeforeClass
public static void init() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
conf.setClass("fs.file.impl", DummyEcFs.class, FileSystem.class);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
.build();
cluster.waitActive();
@ -240,6 +256,120 @@ public class TestDistCpWithRawXAttrs {
dfs.unsetErasureCodingPolicy(dir1);
}
@Test
public void testPreserveECAcrossFilesystems() throws Exception{
// set EC policy on source (HDFS)
String[] args = {"-setPolicy", "-path", dir1.toString(),
"-policy", "XOR-2-1-1024k"};
fs.delete(new Path("/dest"), true);
fs.mkdirs(subDir1);
DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.enableErasureCodingPolicy("XOR-2-1-1024k");
dfs.setErasureCodingPolicy(dir1, "XOR-2-1-1024k");
fs.create(file1).close();
int res = ToolRunner.run(conf, new ECAdmin(conf), args);
assertEquals("Unable to set EC policy on " + subDir1.toString(), 0, res);
String src = "/src/*";
Path dest = new Path(TEST_ROOT_DIR, "dest");
final Path dest2Dir1 = new Path(dest, "dir1");
final Path dest2SubDir1 = new Path(dest2Dir1, "subdir1");
// copy source(HDFS) to target(DummyECFS) with preserveEC
try (DummyEcFs dummyEcFs = (DummyEcFs)FileSystem.get(URI.create("file:///"), conf)) {
Path target = dummyEcFs.makeQualified(dest);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src, target.toString(),
"-pe", conf);
try {
FileStatus destDir1Status = dummyEcFs.getFileStatus(dest2Dir1);
FileStatus destSubDir1Status = dummyEcFs.getFileStatus(dest2SubDir1);
assertNotNull("FileStatus for path: " + dest2Dir1 + " is null", destDir1Status);
assertNotNull("FileStatus for path: " + dest2SubDir1 + " is null", destSubDir1Status);
// check if target paths are erasure coded.
assertTrue("Path is not erasure coded : " + dest2Dir1,
dummyEcFs.isPathErasureCoded(destDir1Status.getPath()));
assertTrue("Path is not erasure coded : " + dest2SubDir1,
dummyEcFs.isPathErasureCoded(destSubDir1Status.getPath()));
// copy source(DummyECFS) to target (HDFS)
String dfsTarget = "/dest";
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS,
target.toString(), dfsTarget, "-pe", conf);
Path dfsTargetPath = new Path(dfsTarget);
Path dfsTargetDir1 = new Path(dfsTarget, "dir1");
ContractTestUtils.assertPathExists(fs,
"Path doesn't exist:" + dfsTargetPath, dfsTargetPath);
ContractTestUtils.assertPathExists(fs,
"Path doesn't exist:" + dfsTargetDir1, dfsTargetDir1);
FileStatus targetDir1Status = fs.getFileStatus(dfsTargetDir1);
assertTrue("Path is not erasure coded : " + targetDir1Status,
targetDir1Status.isErasureCoded());
fs.delete(dfsTargetPath, true);
} finally {
dummyEcFs.delete(new Path(base.getAbsolutePath()),true);
}
}
}
/**
* Dummy/Fake FS implementation that supports Erasure Coding.
*/
public static class DummyEcFs extends LocalFileSystem implements WithErasureCoding {
private Set<Path> erasureCodedPaths;
public DummyEcFs() {
super();
this.erasureCodedPaths = new HashSet<>();
}
public boolean isPathErasureCoded(Path p){
return erasureCodedPaths.contains(p);
}
@Override
public boolean hasPathCapability(Path path, String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY:
return true;
default:
return super.hasPathCapability(path, capability);
}
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
FileStatus fileStatus = super.getFileStatus(f);
if (!erasureCodedPaths.contains(f)) {
return fileStatus;
}
Set<FileStatus.AttrFlags> attrSet = new HashSet<>();
attrSet.add(FileStatus.AttrFlags.HAS_EC);
return new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
fileStatus.getReplication(), fileStatus.getBlockSize(),
fileStatus.getModificationTime(), fileStatus.getAccessTime(),
fileStatus.getPermission(), fileStatus.getOwner(),
fileStatus.getGroup(),
fileStatus.isSymlink() ? fileStatus.getSymlink() : null,
fileStatus.getPath(),
attrSet);
}
@Override
public String getErasureCodingPolicyName(FileStatus fileStatus) {
return "XOR-2-1-1024k";
}
@Override
public void setErasureCodingPolicy(Path path, String ecPolicyName)
throws IOException {
erasureCodedPaths.add(path);
}
}
@Test
public void testUseIterator() throws Exception {