From 3418bbbb597d354bf24cfd610c1ad3adb06d8eae Mon Sep 17 00:00:00 2001 From: Da Zhou Date: Wed, 8 May 2019 17:20:46 +0100 Subject: [PATCH] HADOOP-16269. ABFS: add listFileStatus with StartFrom. Author: Da Zhou --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 107 ++++++++++++- .../azurebfs/constants/AbfsHttpConstants.java | 9 ++ .../hadoop/fs/azurebfs/utils/CRC64.java | 60 +++++++ .../azurebfs/AbstractAbfsIntegrationTest.java | 7 +- ...lobFileSystemStoreListStatusWithRange.java | 151 ++++++++++++++++++ .../hadoop/fs/azurebfs/TestAbfsCrc64.java | 38 +++++ 6 files changed, 363 insertions(+), 9 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemStoreListStatusWithRange.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsCrc64.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index a8e0ed3ef0..2402dbc038 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -32,6 +32,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -47,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -81,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.utils.Base64; +import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; @@ -92,7 +95,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; + /** * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage. */ @@ -106,6 +119,7 @@ public class AzureBlobFileSystemStore implements Closeable { private String userName; private String primaryUserGroup; private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"; private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; private static final int LIST_MAX_RESULTS = 500; @@ -522,15 +536,43 @@ public FileStatus getFileStatus(final Path path) throws IOException { eTag); } + /** + * @param path The list path. + * @return the entries in the path. + * */ public FileStatus[] listStatus(final Path path) throws IOException { - LOG.debug("listStatus filesystem: {} path: {}", + return listStatus(path, null); + } + + /** + * @param path Path the list path. + * @param startFrom the entry name that list results should start with. + * For example, if folder "/folder" contains four files: "afile", "bfile", "hfile", "ifile". + * Then listStatus(Path("/folder"), "hfile") will return "/folder/hfile" and "folder/ifile" + * Notice that if startFrom is a non-existent entry name, then the list response contains + * all entries after this non-existent entry in lexical order: + * listStatus(Path("/folder"), "cfile") will return "/folder/hfile" and "/folder/ifile". + * + * @return the entries in the path start from "startFrom" in lexical order. + * */ + @InterfaceStability.Unstable + public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException { + LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}", client.getFileSystem(), - path); + path, + startFrom); - String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); + final String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); String continuation = null; - ArrayList fileStatuses = new ArrayList<>(); + // generate continuation token if a valid startFrom is provided. + if (startFrom != null && !startFrom.isEmpty()) { + continuation = getIsNamespaceEnabled() + ? generateContinuationTokenForXns(startFrom) + : generateContinuationTokenForNonXns(path.isRoot() ? ROOT_PATH : relativePath, startFrom); + } + + ArrayList fileStatuses = new ArrayList<>(); do { AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); @@ -583,6 +625,61 @@ public FileStatus[] listStatus(final Path path) throws IOException { return fileStatuses.toArray(new FileStatus[fileStatuses.size()]); } + // generate continuation token for xns account + private String generateContinuationTokenForXns(final String firstEntryName) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName) + && !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH), + "startFrom must be a dir/file name and it can not be a full path"); + + StringBuilder sb = new StringBuilder(); + sb.append(firstEntryName).append("#$").append("0"); + + CRC64 crc64 = new CRC64(); + StringBuilder token = new StringBuilder(); + token.append(crc64.compute(sb.toString().getBytes(StandardCharsets.UTF_8))) + .append(SINGLE_WHITE_SPACE) + .append("0") + .append(SINGLE_WHITE_SPACE) + .append(firstEntryName); + + return Base64.encode(token.toString().getBytes(StandardCharsets.UTF_8)); + } + + // generate continuation token for non-xns account + private String generateContinuationTokenForNonXns(final String path, final String firstEntryName) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName) + && !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH), + "startFrom must be a dir/file name and it can not be a full path"); + + // Notice: non-xns continuation token requires full path (first "/" is not included) for startFrom + final String startFrom = (path.isEmpty() || path.equals(ROOT_PATH)) + ? firstEntryName + : path + ROOT_PATH + firstEntryName; + + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TOKEN_DATE_PATTERN, Locale.US); + String date = simpleDateFormat.format(new Date()); + String token = String.format("%06d!%s!%06d!%s!%06d!%s!", + path.length(), path, startFrom.length(), startFrom, date.length(), date); + String base64EncodedToken = Base64.encode(token.getBytes(StandardCharsets.UTF_8)); + + StringBuilder encodedTokenBuilder = new StringBuilder(base64EncodedToken.length() + 5); + encodedTokenBuilder.append(String.format("%s!%d!", TOKEN_VERSION, base64EncodedToken.length())); + + for (int i = 0; i < base64EncodedToken.length(); i++) { + char current = base64EncodedToken.charAt(i); + if (CHAR_FORWARD_SLASH == current) { + current = CHAR_UNDERSCORE; + } else if (CHAR_PLUS == current) { + current = CHAR_STAR; + } else if (CHAR_EQUALS == current) { + current = CHAR_HYPHEN; + } + encodedTokenBuilder.append(current); + } + + return encodedTokenBuilder.toString(); + } + public void setOwner(final Path path, final String owner, final String group) throws AzureBlobFileSystemException { if (!getIsNamespaceEnabled()) { @@ -1002,7 +1099,7 @@ public boolean equals(Object obj) { FileStatus other = (FileStatus) obj; - if (!other.equals(this)) {// compare the path + if (!this.getPath().equals(other.getPath())) {// compare the path return false; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 1f35854af2..e85c7f0e84 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -39,6 +39,7 @@ public final class AbfsHttpConstants { public static final String GET_ACCESS_CONTROL = "getAccessControl"; public static final String GET_STATUS = "getStatus"; public static final String DEFAULT_TIMEOUT = "90"; + public static final String TOKEN_VERSION = "2"; public static final String JAVA_VERSION = "java.version"; public static final String OS_NAME = "os.name"; @@ -91,5 +92,13 @@ public final class AbfsHttpConstants { public static final String PERMISSION_FORMAT = "%04d"; public static final String SUPER_USER = "$superuser"; + public static final char CHAR_FORWARD_SLASH = '/'; + public static final char CHAR_EXCLAMATION_POINT = '!'; + public static final char CHAR_UNDERSCORE = '_'; + public static final char CHAR_HYPHEN = '-'; + public static final char CHAR_EQUALS = '='; + public static final char CHAR_STAR = '*'; + public static final char CHAR_PLUS = '+'; + private AbfsHttpConstants() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java new file mode 100644 index 0000000000..9790744065 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CRC64.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.utils; + +/** + * CRC64 implementation for AzureBlobFileSystem. + */ +public class CRC64 { + + private static final long POLY = 0x9a6c9329ac4bc9b5L; + private static final int TABLE_LENGTH = 256; + private static final long[] TABLE = new long[TABLE_LENGTH]; + + private long value = -1; + + /** + * @param input byte arrays. + * @return long value of the CRC-64 checksum of the data. + * */ + public long compute(byte[] input) { + init(); + for (int i = 0; i < input.length; i++) { + value = TABLE[(input[i] ^ (int) value) & 0xFF] ^ (value >>> 8); + } + return ~value; + } + + /* + * Initialize a table constructed from POLY (0x9a6c9329ac4bc9b5L). + * */ + private void init() { + value = -1; + for (int n = 0; n < TABLE_LENGTH; ++n) { + long crc = n; + for (int i = 0; i < 8; ++i) { + if ((crc & 1) == 1) { + crc = (crc >>> 1) ^ POLY; + } else { + crc >>>= 1; + } + } + TABLE[n] = crc; + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index fc2258997b..3eaed1f879 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -24,7 +24,6 @@ import java.util.UUID; import java.util.concurrent.Callable; -import com.google.common.base.Preconditions; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; @@ -211,9 +210,9 @@ public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception { * @throws IOException failure during create/init. */ public AzureBlobFileSystem createFileSystem() throws IOException { - Preconditions.checkState(abfs == null, - "existing ABFS instance exists: %s", abfs); - abfs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig); + if (abfs == null) { + abfs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig); + } return abfs; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemStoreListStatusWithRange.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemStoreListStatusWithRange.java new file mode 100644 index 0000000000..849bb6ba09 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemStoreListStatusWithRange.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +/** + * Test AzureBlobFileSystemStore listStatus with startFrom. + * */ +@RunWith(Parameterized.class) +public class ITestAzureBlobFileSystemStoreListStatusWithRange extends + AbstractAbfsIntegrationTest { + private static final boolean SUCCEED = true; + private static final boolean FAIL = false; + private static final String[] SORTED_ENTRY_NAMES = {"1_folder", "A0", "D01", "a+", "c0", "name5"}; + + private AzureBlobFileSystemStore store; + private AzureBlobFileSystem fs; + + @Parameterized.Parameter + public String path; + + /** + * A valid startFrom for listFileStatus with range is a non-fully qualified dir/file name + * */ + @Parameterized.Parameter(1) + public String startFrom; + + @Parameterized.Parameter(2) + public int expectedStartIndexInArray; + + @Parameterized.Parameter(3) + public boolean expectedResult; + + @Parameterized.Parameters(name = "Testing path \"{0}\", startFrom: \"{1}\", Expecting result : {3}") // Test path + public static Iterable params() { + return Arrays.asList( + new Object[][]{ + // case 0: list in root, without range + {"/", null, 0, SUCCEED}, + + // case 1: list in the root, start from the second file + {"/", SORTED_ENTRY_NAMES[1], 1, SUCCEED}, + + // case 2: list in the root, invalid startFrom + {"/", "/", -1, FAIL}, + + // case 3: list in non-root level, valid startFrom : dir name + {"/" + SORTED_ENTRY_NAMES[2], SORTED_ENTRY_NAMES[1], 1, SUCCEED}, + + // case 4: list in non-root level, valid startFrom : file name + {"/" + SORTED_ENTRY_NAMES[2], SORTED_ENTRY_NAMES[2], 2, SUCCEED}, + + // case 5: list in non root level, invalid startFrom + {"/" + SORTED_ENTRY_NAMES[2], "/" + SORTED_ENTRY_NAMES[3], -1, FAIL}, + + // case 6: list using non existent startFrom, startFrom is smaller than the entries in lexical order + // expecting return all entries + {"/" + SORTED_ENTRY_NAMES[2], "0-non-existent", 0, SUCCEED}, + + // case 7: list using non existent startFrom, startFrom is larger than the entries in lexical order + // expecting return 0 entries + {"/" + SORTED_ENTRY_NAMES[2], "z-non-existent", -1, SUCCEED}, + + // case 8: list using non existent startFrom, startFrom is in the range + {"/" + SORTED_ENTRY_NAMES[2], "A1", 2, SUCCEED} + }); + } + + public ITestAzureBlobFileSystemStoreListStatusWithRange() throws Exception { + super(); + if (this.getFileSystem() == null) { + super.createFileSystem(); + } + fs = this.getFileSystem(); + store = fs.getAbfsStore(); + prepareTestFiles(); + // Sort the names for verification, ABFS service should return the results in order. + Arrays.sort(SORTED_ENTRY_NAMES); + } + + @Test + public void testListWithRange() throws IOException { + try { + FileStatus[] listResult = store.listStatus(new Path(path), startFrom); + if (!expectedResult) { + Assert.fail("Excepting failure with IllegalArgumentException"); + } + verifyFileStatus(listResult, new Path(path), expectedStartIndexInArray); + } catch (IllegalArgumentException ex) { + if (expectedResult) { + Assert.fail("Excepting success"); + } + } + } + + // compare the file status + private void verifyFileStatus(FileStatus[] listResult, Path parentPath, int startIndexInSortedName) throws IOException { + if (startIndexInSortedName == -1) { + Assert.assertEquals("Expected empty FileStatus array", 0, listResult.length); + return; + } + + FileStatus[] allFileStatuses = fs.listStatus(parentPath); + Assert.assertEquals("number of dir/file doesn't match", + SORTED_ENTRY_NAMES.length, allFileStatuses.length); + int indexInResult = 0; + for (int index = startIndexInSortedName; index < SORTED_ENTRY_NAMES.length; index++) { + Assert.assertEquals("fileStatus doesn't match", allFileStatuses[index], listResult[indexInResult++]); + } + } + + private void prepareTestFiles() throws IOException { + final AzureBlobFileSystem fs = getFileSystem(); + // created 2 level file structures + for (String levelOneFolder : SORTED_ENTRY_NAMES) { + Path levelOnePath = new Path("/" + levelOneFolder); + Assert.assertTrue(fs.mkdirs(levelOnePath)); + for (String fileName : SORTED_ENTRY_NAMES) { + Path filePath = new Path(levelOnePath, fileName); + ContractTestUtils.touch(fs, filePath); + ContractTestUtils.assertIsFile(fs, filePath); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsCrc64.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsCrc64.java new file mode 100644 index 0000000000..ab39750ebf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsCrc64.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.utils.CRC64; +/** + * Test for Crc64 in AzureBlobFileSystem, notice that ABFS CRC64 has its own polynomial. + * */ +public class TestAbfsCrc64 { + + @Test + public void tesCrc64Compute() { + CRC64 crc64 = new CRC64(); + final String[] testStr = {"#$", "dir_2_ac83abee", "dir_42_976df1f5"}; + final String[] expected = {"f91f7e6a837dbfa8", "203f9fefc38ae97b", "cc0d56eafe58a855"}; + for (int i = 0; i < testStr.length; i++) { + Assert.assertEquals(expected[i], Long.toHexString(crc64.compute(testStr[i].getBytes()))); + } + } +}