From 52db86b0bb4d2fa644362c23a6566eec1cb5200b Mon Sep 17 00:00:00 2001 From: Jinglun Date: Thu, 8 Oct 2020 17:36:07 +0800 Subject: [PATCH] HADOOP-17021. Add concat fs command (#1993) Contributed by Jinglun --- .../org/apache/hadoop/fs/shell/Concat.java | 91 ++++++++++ .../org/apache/hadoop/fs/shell/FsCommand.java | 1 + .../src/site/markdown/FileSystemShell.md | 13 ++ .../hadoop/fs/shell/TestFsShellConcat.java | 167 ++++++++++++++++++ 4 files changed, 272 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Concat.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestFsShellConcat.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Concat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Concat.java new file mode 100644 index 0000000000..5afafaff09 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Concat.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.shell; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.LinkedList; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; + +/** + * Concat the given files. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class Concat extends FsCommand { + public static void registerCommands(CommandFactory factory) { + factory.addClass(Concat.class, "-concat"); + } + + public static final String NAME = "concat"; + public static final String USAGE = " ..."; + public static final String DESCRIPTION = "Concatenate existing source files" + + " into the target file. Target file and source files should be in the" + + " same directory."; + private static FileSystem testFs; // test only. + + @Override + protected void processArguments(LinkedList args) + throws IOException { + if (args.size() < 1) { + throw new IOException("Target path not specified. " + USAGE); + } + if (args.size() < 3) { + throw new IOException( + "The number of source paths is less than 2. " + USAGE); + } + PathData target = args.removeFirst(); + LinkedList srcList = args; + if (!target.exists || !target.stat.isFile()) { + throw new FileNotFoundException(String + .format("Target path %s does not exist or is" + " not file.", + target.path)); + } + Path[] srcArray = new Path[srcList.size()]; + for (int i = 0; i < args.size(); i++) { + PathData src = srcList.get(i); + if (!src.exists || !src.stat.isFile()) { + throw new FileNotFoundException( + String.format("%s does not exist or is not file.", src.path)); + } + srcArray[i] = src.path; + } + FileSystem fs = target.fs; + if (testFs != null) { + fs = testFs; + } + try { + fs.concat(target.path, srcArray); + } catch (UnsupportedOperationException exception) { + throw new PathIOException("Dest filesystem '" + fs.getUri().getScheme() + + "' doesn't support concat.", exception); + } + } + + @VisibleForTesting + static void setTestFs(FileSystem fs) { + testFs = fs; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java index ea8378dc45..0abf0e3c58 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/FsCommand.java @@ -70,6 +70,7 @@ public static void registerCommands(CommandFactory factory) { factory.registerCommands(Truncate.class); factory.registerCommands(SnapshotCommands.class); factory.registerCommands(XAttrCommands.class); + factory.registerCommands(Concat.class); } protected FsCommand() {} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index 58b7209c13..c5080dba29 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -813,6 +813,18 @@ Example: * `hadoop fs -truncate 55 /user/hadoop/file1 /user/hadoop/file2` * `hadoop fs -truncate -w 127 hdfs://nn1.example.com/user/hadoop/file1` +concat +-------- + +Usage: `hadoop fs -concat ` + +Concatenate existing source files into the target file. Target file and source +files should be in the same directory. + +Example: + +* `hadoop fs -concat hdfs://cluster/user/hadoop/target-file hdfs://cluster/user/hadoop/file-0 hdfs://cluster/user/hadoop/file-1` + usage ----- @@ -1092,6 +1104,7 @@ actually fail. | `setfattr` | generally unsupported permissions model | | `setrep`| has no effect | | `truncate` | generally unsupported | +| `concat` | generally unsupported | Different object store clients *may* support these commands: do consult the documentation and test against the target store. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestFsShellConcat.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestFsShellConcat.java new file mode 100644 index 0000000000..a2c4d3a197 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestFsShellConcat.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.shell; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.assertj.core.api.Assertions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.mockito.ArgumentMatchers.any; +import static org.junit.Assert.assertEquals; + +/** + * Test Concat. + */ +public class TestFsShellConcat extends AbstractHadoopTestBase { + + private static Configuration conf; + private static FsShell shell; + private static LocalFileSystem lfs; + private static Path testRootDir; + private static Path dstPath; + + @Before + public void before() throws IOException { + conf = new Configuration(); + shell = new FsShell(conf); + lfs = FileSystem.getLocal(conf); + testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath( + "testFsShellCopy"))); + + lfs.delete(testRootDir, true); + lfs.mkdirs(testRootDir); + lfs.setWorkingDirectory(testRootDir); + dstPath = new Path(testRootDir, "dstFile"); + lfs.create(dstPath).close(); + + Random random = new Random(); + for (int i = 0; i < 10; i++) { + OutputStream out = + lfs.create(new Path(testRootDir, String.format("file-%02d", i))); + out.write(random.nextInt()); + out.close(); + } + } + + @Test + public void testConcat() throws Exception { + // Read concatenated files to build the expected file content. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (int i = 0; i < 10; i++) { + try (InputStream in = lfs + .open(new Path(testRootDir, String.format("file-%02d", i)))) { + IOUtils.copyBytes(in, out, 1024); + } + } + byte[] expectContent = out.toByteArray(); + + // Do concat. + FileSystem mockFs = Mockito.mock(FileSystem.class); + Mockito.doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Path target = (Path)args[0]; + Path[] src = (Path[]) args[1]; + mockConcat(target, src); + return null; + }).when(mockFs).concat(any(Path.class), any(Path[].class)); + Concat.setTestFs(mockFs); + shellRun(0, "-concat", dstPath.toString(), testRootDir+"/file-*"); + + // Verify concat result. + ContractTestUtils + .assertPathExists(lfs, "The target file doesn't exist.", dstPath); + Assertions.assertThat(lfs.listStatus(testRootDir).length).isEqualTo(1); + assertEquals(expectContent.length, lfs.getFileStatus(dstPath).getLen()); + out = new ByteArrayOutputStream(); + try (InputStream in = lfs.open(dstPath)) { + IOUtils.copyBytes(in, out, 1024); + } + // Verify content. + byte[] concatedContent = out.toByteArray(); + assertEquals(expectContent.length, concatedContent.length); + ContractTestUtils.compareByteArrays(expectContent, concatedContent, + expectContent.length); + } + + @Test + public void testUnsupportedFs() throws Exception { + FileSystem mockFs = Mockito.mock(FileSystem.class); + Mockito.doThrow( + new UnsupportedOperationException("Mock unsupported exception.")) + .when(mockFs).concat(any(Path.class), any(Path[].class)); + Mockito.doAnswer(invocationOnMock -> new URI("mockfs:///")).when(mockFs) + .getUri(); + Concat.setTestFs(mockFs); + final ByteArrayOutputStream err = new ByteArrayOutputStream(); + PrintStream oldErr = System.err; + System.setErr(new PrintStream(err)); + try { + shellRun(1, "-concat", dstPath.toString(), testRootDir + "/file-*"); + } finally { + System.setErr(oldErr); + } + System.err.print(err.toString()); + String expectedErrMsg = "Dest filesystem 'mockfs' doesn't support concat"; + Assertions.assertThat(err.toString().contains(expectedErrMsg)) + .withFailMessage("The err message should contain \"" + expectedErrMsg + + "\" message.").isTrue(); + } + + private void shellRun(int n, String... args) { + assertEquals(n, shell.run(args)); + } + + /** + * Simple simulation of concat. + */ + private void mockConcat(Path target, Path[] srcArray) throws IOException { + Path tmp = new Path(target.getParent(), target.getName() + ".bak"); + lfs.rename(target, tmp); + try (OutputStream out = lfs.create(target)) { + try (InputStream in = lfs.open(tmp)) { + IOUtils.copyBytes(in, out, 1024); + } + lfs.delete(tmp, true); + for (int i = 0; i < srcArray.length; i++) { + try (InputStream iin = lfs.open(srcArray[i])) { + IOUtils.copyBytes(iin, out, 1024); + } + lfs.delete(srcArray[i], true); + } + } + } +}