diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 1216fe0696..35fe33be71 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -359,8 +359,11 @@ public class AbfsConfiguration{ FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE) private boolean renameResilience; - private String clientProvidedEncryptionKey; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION) + private boolean isChecksumValidationEnabled; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; public AbfsConfiguration(final Configuration rawConfig, String accountName) @@ -1240,4 +1243,13 @@ public boolean getRenameResilience() { void setRenameResilience(boolean actualResilience) { renameResilience = actualResilience; } + + public boolean getIsChecksumValidationEnabled() { + return isChecksumValidationEnabled; + } + + @VisibleForTesting + public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) { + this.isChecksumValidationEnabled = isChecksumValidationEnabled; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 63de71eb17..d746e3c9e3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -93,6 +93,7 @@ public final class AbfsHttpConstants { public static final String FORWARD_SLASH_ENCODE = "%2F"; public static final String AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER = "@"; public static final String UTF_8 = "utf-8"; + public static final String MD5 = "MD5"; public static final String GMT_TIMEZONE = "GMT"; public static final String APPLICATION_JSON = "application/json"; public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index af60ce949f..b11c8c2ad1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -275,6 +275,9 @@ public final class ConfigurationKeys { /** Add extra resilience to rename failures, at the expense of performance. */ public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience"; + /** Add extra layer of verification of the integrity of the request content during transport: {@value}. */ + public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation"; + public static String accountProperty(String property, String account) { return property + "." + account; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 331c9e5684..dd4d7edc6b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -133,6 +133,7 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; + public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false; /** * Limit of queued block upload operations before writes diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index c792e463c7..84a94b994c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -72,6 +72,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id"; public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period"; public static final String EXPECT = "Expect"; + public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5"; private HttpHeaderConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java new file mode 100644 index 0000000000..a7635e893c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; + +/** + * Exception to be thrown if any Runtime Exception occurs. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AbfsDriverException extends AbfsRestOperationException { + + private static final String ERROR_MESSAGE = "Runtime Exception Occurred In ABFS Driver"; + + public AbfsDriverException(final Exception innerException) { + super( + AzureServiceErrorCode.UNKNOWN.getStatusCode(), + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + innerException != null + ? innerException.toString() + : ERROR_MESSAGE, + innerException); + } + + public AbfsDriverException(final Exception innerException, final String activityId) { + super( + AzureServiceErrorCode.UNKNOWN.getStatusCode(), + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + innerException != null + ? innerException.toString() + ", rId: " + activityId + : ERROR_MESSAGE + ", rId: " + activityId, + null); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsInvalidChecksumException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsInvalidChecksumException.java new file mode 100644 index 0000000000..ccf2937542 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsInvalidChecksumException.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; + +/** + * Exception to wrap invalid checksum verification on client side. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AbfsInvalidChecksumException extends AbfsRestOperationException { + + private static final String ERROR_MESSAGE = "Checksum Validation Failed, MD5 Mismatch Error"; + + public AbfsInvalidChecksumException(final AbfsRestOperationException abfsRestOperationException) { + super( + abfsRestOperationException != null + ? abfsRestOperationException.getStatusCode() + : AzureServiceErrorCode.UNKNOWN.getStatusCode(), + abfsRestOperationException != null + ? abfsRestOperationException.getErrorCode().getErrorCode() + : AzureServiceErrorCode.UNKNOWN.getErrorCode(), + abfsRestOperationException != null + ? abfsRestOperationException.toString() + : ERROR_MESSAGE, + abfsRestOperationException); + } + + public AbfsInvalidChecksumException(final String activityId) { + super( + AzureServiceErrorCode.UNKNOWN.getStatusCode(), + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + ERROR_MESSAGE + ", rId: " + activityId, + null); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 8a5e9db855..6c0ecfcdf8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -47,6 +47,8 @@ public enum AzureServiceErrorCode { INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null), + MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST, + "The MD5 value specified in the request did not match with the MD5 value calculated by the server."), UNKNOWN(null, -1, null); private final String errorCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index cb6f8e9ead..45da438a91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -25,7 +25,10 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLEncoder; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Locale; import java.util.UUID; @@ -34,6 +37,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; @@ -76,6 +82,7 @@ import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; @@ -851,6 +858,11 @@ public AbfsRestOperation append(final String path, final byte[] buffer, requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry)); } + // Add MD5 Hash of request content as request header if feature is enabled + if (isChecksumValidationEnabled()) { + addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + } + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder, cachedSasToken); @@ -867,7 +879,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer, sasTokenForReuse); try { op.execute(tracingContext); - } catch (AzureBlobFileSystemException e) { + } catch (AbfsRestOperationException e) { /* If the http response code indicates a user error we retry the same append request with expect header being disabled. @@ -877,7 +889,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer, if someone has taken dependency on the exception message, which is created using the error string present in the response header. */ - int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode(); + int responseStatusCode = e.getStatusCode(); if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) { LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path); reqParams.setExpectHeaderEnabled(false); @@ -889,6 +901,11 @@ public AbfsRestOperation append(final String path, final byte[] buffer, if (!op.hasResult()) { throw e; } + + if (isMd5ChecksumError(e)) { + throw new AbfsInvalidChecksumException(e); + } + if (reqParams.isAppendBlob() && appendSuccessCheckOp(op, path, (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { @@ -907,6 +924,13 @@ && appendSuccessCheckOp(op, path, throw e; } + catch (AzureBlobFileSystemException e) { + // Any server side issue will be returned as AbfsRestOperationException and will be handled above. + LOG.debug("Append request failed with non server issues for path: {}, offset: {}, position: {}", + path, reqParams.getoffset(), reqParams.getPosition()); + throw e; + } + return op; } @@ -920,6 +944,16 @@ private boolean checkUserError(int responseStatusCode) { && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR); } + /** + * To check if the failure exception returned by server is due to MD5 Mismatch + * @param e Exception returned by AbfsRestOperation + * @return boolean whether exception is due to MD5Mismatch or not + */ + private boolean isMd5ChecksumError(final AbfsRestOperationException e) { + AzureServiceErrorCode storageErrorCode = e.getErrorCode(); + return storageErrorCode == AzureServiceErrorCode.MD5_MISMATCH; + } + // For AppendBlob its possible that the append succeeded in the backend but the request failed. // However a retry would fail with an InvalidQueryParameterValue // (as the current offset would be unacceptable). @@ -1049,10 +1083,16 @@ public AbfsRestOperation read(final String path, final List requestHeaders = createDefaultHeaders(); addEncryptionKeyRequestHeaders(path, requestHeaders, false, contextEncryptionAdapter, tracingContext); - requestHeaders.add(new AbfsHttpHeader(RANGE, - String.format("bytes=%d-%d", position, position + bufferLength - 1))); + AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, + String.format("bytes=%d-%d", position, position + bufferLength - 1)); + requestHeaders.add(rangeHeader); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + // Add request header to fetch MD5 Hash of data returned by server. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, @@ -1069,6 +1109,11 @@ public AbfsRestOperation read(final String path, bufferLength, sasTokenForReuse); op.execute(tracingContext); + // Verify the MD5 hash returned by server holds valid on the data received. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); + } + return op; } @@ -1492,6 +1537,100 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx, } } + /** + * Add MD5 hash as request header to the append request. + * @param requestHeaders to be updated with checksum header + * @param reqParams for getting offset and length + * @param buffer for getting input data for MD5 computation + * @throws AbfsRestOperationException if Md5 computation fails + */ + private void addCheckSumHeaderForWrite(List requestHeaders, + final AppendRequestParameters reqParams, final byte[] buffer) + throws AbfsRestOperationException { + String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(), + reqParams.getLength()); + requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash)); + } + + /** + * To verify the checksum information received from server for the data read. + * @param buffer stores the data received from server. + * @param result HTTP Operation Result. + * @param bufferOffset Position where data returned by server is saved in buffer. + * @throws AbfsRestOperationException if Md5Mismatch. + */ + private void verifyCheckSumForRead(final byte[] buffer, + final AbfsHttpOperation result, final int bufferOffset) + throws AbfsRestOperationException { + // Number of bytes returned by server could be less than or equal to what + // caller requests. In case it is less, extra bytes will be initialized to 0 + // Server returned MD5 Hash will be computed on what server returned. + // We need to get exact data that server returned and compute its md5 hash + // Computed hash should be equal to what server returned. + int numberOfBytesRead = (int) result.getBytesReceived(); + if (numberOfBytesRead == 0) { + return; + } + String md5HashComputed = computeMD5Hash(buffer, bufferOffset, + numberOfBytesRead); + String md5HashActual = result.getResponseHeader(CONTENT_MD5); + if (!md5HashComputed.equals(md5HashActual)) { + LOG.debug("Md5 Mismatch Error in Read Operation. Server returned Md5: {}, Client computed Md5: {}", md5HashActual, md5HashComputed); + throw new AbfsInvalidChecksumException(result.getRequestId()); + } + } + + /** + * Conditions check for allowing checksum support for read operation. + * Sending MD5 Hash in request headers. For more details see + * @see + * Path - Read Azure Storage Rest API. + * 1. Range header must be present as one of the request headers. + * 2. buffer length must be less than or equal to 4 MB. + * @param requestHeaders to be checked for range header. + * @param rangeHeader must be present. + * @param bufferLength must be less than or equal to 4 MB. + * @return true if all conditions are met. + */ + private boolean isChecksumValidationEnabled(List requestHeaders, + final AbfsHttpHeader rangeHeader, final int bufferLength) { + return getAbfsConfiguration().getIsChecksumValidationEnabled() + && requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB; + } + + /** + * Conditions check for allowing checksum support for write operation. + * Server will support this if client sends the MD5 Hash as a request header. + * For azure stoage service documentation see + * @see + * Path - Update Azure Rest API. + * @return true if checksum validation enabled. + */ + private boolean isChecksumValidationEnabled() { + return getAbfsConfiguration().getIsChecksumValidationEnabled(); + } + + /** + * Compute MD5Hash of the given byte array starting from given offset up to given length. + * @param data byte array from which data is fetched to compute MD5 Hash. + * @param off offset in the array from where actual data starts. + * @param len length of the data to be used to compute MD5Hash. + * @return MD5 Hash of the data as String. + * @throws AbfsRestOperationException if computation fails. + */ + @VisibleForTesting + public String computeMD5Hash(final byte[] data, final int off, final int len) + throws AbfsRestOperationException { + try { + MessageDigest md5Digest = MessageDigest.getInstance(MD5); + md5Digest.update(data, off, len); + byte[] md5Bytes = md5Digest.digest(); + return Base64.getEncoder().encodeToString(md5Bytes); + } catch (NoSuchAlgorithmException ex) { + throw new AbfsDriverException(ex); + } + } + @VisibleForTesting URL getBaseUrl() { return baseUrl; diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 9021f3e3b1..3f2e89ad6f 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -941,7 +941,7 @@ string retrieved from a GetFileStatus request to the server. implementing EncryptionContextProvider. ### Server Options -When the config `fs.azure.io.read.tolerate.concurrent.append` is made true, the +`fs.azure.io.read.tolerate.concurrent.append`: When the config is made true, the If-Match header sent to the server for read calls will be set as * otherwise the same will be set with ETag. This is basically a mechanism in place to handle the reads with optimistic concurrency. @@ -949,14 +949,23 @@ Please refer the following links for further information. 1. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read 2. https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/ -listStatus API fetches the FileStatus information from server in a page by page -manner. The config `fs.azure.list.max.results` used to set the maxResults URI - param which sets the pagesize(maximum results per call). The value should - be > 0. By default this will be 5000. Server has a maximum value for this - parameter as 5000. So even if the config is above 5000 the response will only +`fs.azure.list.max.results`: listStatus API fetches the FileStatus information +from server in a page by page manner. The config is used to set the maxResults URI +param which sets the page size(maximum results per call). The value should +be > 0. By default, this will be 5000. Server has a maximum value for this +parameter as 5000. So even if the config is above 5000 the response will only contain 5000 entries. Please refer the following link for further information. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list +`fs.azure.enable.checksum.validation`: When the config is set to true, Content-MD5 +headers are sent to the server for read and append calls. This provides a way +to verify the integrity of data during transport. This will have performance +impact due to MD5 Hash re-computation on Client and Server side. Please refer +to the Azure documentation for +[Read](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read) +and [Append](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update) +APIs for more details + ### Throttling Options ABFS driver has the capability to throttle read and write operations to achieve maximum throughput by minimizing errors. The errors occur when the account diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 66a1b22da9..16f2025f21 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -145,6 +145,9 @@ protected AbstractAbfsIntegrationTest() throws Exception { } else { this.isIPAddress = false; } + + // For tests, we want to enforce checksum validation so that any regressions can be caught. + abfsConfig.setIsChecksumValidationEnabled(true); } protected boolean getIsNamespaceEnabled(AzureBlobFileSystem fs) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java new file mode 100644 index 0000000000..a23f500d5f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.HashSet; + +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.impl.OpenFileParameters; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.any; + +/** + * Test For Verifying Checksum Related Operations + */ +public class ITestAzureBlobFileSystemChecksum extends AbstractAbfsIntegrationTest { + + private static final int MB_2 = 2 * ONE_MB; + private static final int MB_3 = 3 * ONE_MB; + private static final int MB_4 = 4 * ONE_MB; + private static final int MB_8 = 8 * ONE_MB; + private static final int MB_15 = 15 * ONE_MB; + private static final int MB_16 = 16 * ONE_MB; + private static final String INVALID_MD5_TEXT = "Text for Invalid MD5 Computation"; + + public ITestAzureBlobFileSystemChecksum() throws Exception { + super(); + } + + @Test + public void testWriteReadWithChecksum() throws Exception { + testWriteReadWithChecksumInternal(true); + testWriteReadWithChecksumInternal(false); + } + + @Test + public void testAppendWithChecksumAtDifferentOffsets() throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true); + AbfsClient client = fs.getAbfsStore().getClient(); + Path path = path("testPath" + getMethodName()); + try (FSDataOutputStream out = fs.create(path)) { + byte[] data = generateRandomBytes(MB_4); + + appendWithOffsetHelper(client, path, data, fs, 0); + appendWithOffsetHelper(client, path, data, fs, ONE_MB); + appendWithOffsetHelper(client, path, data, fs, MB_2); + appendWithOffsetHelper(client, path, data, fs, MB_4 - 1); + } + } + + @Test + public void testReadWithChecksumAtDifferentOffsets() throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true); + AbfsClient client = fs.getAbfsStore().getClient(); + Path path = path("testPath" + getMethodName()); + byte[] data = generateRandomBytes(MB_16); + + createFileWithData(path, data, fs); + readWithOffsetAndPositionHelper(client, path, data, fs, 0, 0); + readWithOffsetAndPositionHelper(client, path, data, fs, MB_4, 0); + readWithOffsetAndPositionHelper(client, path, data, fs, MB_4, ONE_MB); + readWithOffsetAndPositionHelper(client, path, data, fs, MB_8, MB_2); + readWithOffsetAndPositionHelper(client, path, data, fs, MB_15, MB_4 - 1); + } + + @Test + public void testWriteReadWithChecksumAndOptions() throws Exception { + testWriteReadWithChecksumAndOptionsInternal(true); + testWriteReadWithChecksumAndOptionsInternal(false); + } + + @Test + public void testAbfsInvalidChecksumExceptionInAppend() throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true); + AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient()); + Path path = path("testPath" + getMethodName()); + fs.create(path); + byte[] data= generateRandomBytes(MB_4); + String invalidMD5Hash = spiedClient.computeMD5Hash( + INVALID_MD5_TEXT.getBytes(), 0, INVALID_MD5_TEXT.length()); + Mockito.doReturn(invalidMD5Hash).when(spiedClient).computeMD5Hash(any(), + any(Integer.class), any(Integer.class)); + AbfsRestOperationException ex = intercept(AbfsInvalidChecksumException.class, () -> { + appendWithOffsetHelper(spiedClient, path, data, fs, 0); + }); + + Assertions.assertThat(ex.getErrorCode()) + .describedAs("Exception Message should contain MD5Mismatch") + .isEqualTo(AzureServiceErrorCode.MD5_MISMATCH); + } + + @Test + public void testAbfsInvalidChecksumExceptionInRead() throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true); + AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient()); + Path path = path("testPath" + getMethodName()); + byte[] data = generateRandomBytes(MB_3); + createFileWithData(path, data, fs); + + String invalidMD5Hash = spiedClient.computeMD5Hash( + INVALID_MD5_TEXT.getBytes(), 0, INVALID_MD5_TEXT.length()); + Mockito.doReturn(invalidMD5Hash).when(spiedClient).computeMD5Hash(any(), + any(Integer.class), any(Integer.class)); + + intercept(AbfsInvalidChecksumException.class, () -> { + readWithOffsetAndPositionHelper(spiedClient, path, data, fs, 0, 0); + }); + } + + private void testWriteReadWithChecksumInternal(final boolean readAheadEnabled) + throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, readAheadEnabled); + final int dataSize = MB_16 + 1000; + Path testPath = path("testPath" + getMethodName()); + byte[] bytesUploaded = generateRandomBytes(dataSize); + + createFileWithData(testPath, bytesUploaded, fs); + + try (FSDataInputStream in = fs.open(testPath)) { + byte[] bytesRead = new byte[bytesUploaded.length]; + in.read(bytesRead, 0, dataSize); + + // Verify that the data read is same as data written + Assertions.assertThat(bytesRead) + .describedAs("Bytes read with checksum enabled are not as expected") + .containsExactly(bytesUploaded); + } + } + + /** + * Verify that the checksum computed on client side matches with the one + * computed at server side. If not, request will fail with 400 Bad request. + * @param client + * @param path + * @param data + * @param fs + * @param offset + * @throws Exception + */ + private void appendWithOffsetHelper(AbfsClient client, Path path, + byte[] data, AzureBlobFileSystem fs, final int offset) throws Exception { + AppendRequestParameters reqParams = new AppendRequestParameters( + 0, offset, data.length - offset, APPEND_MODE, false, null, true); + client.append(path.toUri().getPath(), data, reqParams, null, null, + getTestTracingContext(fs, false)); + } + + /** + * Verify that the checksum returned by server is same as computed on client + * side even when read from different positions and stored at different offsets + * If not server request will pass but client.read() will fail with + * {@link org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException} + * @param client + * @param path + * @param data + * @param fs + * @param position + * @param offset + * @throws Exception + */ + private void readWithOffsetAndPositionHelper(AbfsClient client, Path path, + byte[] data, AzureBlobFileSystem fs, final int position, + final int offset) throws Exception { + + int bufferLength = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); + byte[] readBuffer = new byte[bufferLength]; + final int readLength = bufferLength - offset; + + client.read(path.toUri().getPath(), position, readBuffer, offset, readLength, + "*", null, null, getTestTracingContext(fs, false)); + + byte[] actual = Arrays.copyOfRange(readBuffer, offset, offset + readLength); + byte[] expected = Arrays.copyOfRange(data, position, readLength + position); + Assertions.assertThat(actual) + .describedAs("Data read should be same as Data Written") + .containsExactly(expected); + } + + private void testWriteReadWithChecksumAndOptionsInternal( + final boolean readAheadEnabled) throws Exception { + AzureBlobFileSystem fs = getConfiguredFileSystem(MB_8, ONE_MB, readAheadEnabled); + final int dataSize = MB_16 + 1000; + + Path testPath = path("testPath" + getMethodName()); + byte[] bytesUploaded = generateRandomBytes(dataSize); + createFileWithData(testPath, bytesUploaded, fs); + + Configuration cpm1 = new Configuration(); + cpm1.setBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, true); + try (FSDataInputStream in = fs.openFileWithOptions(testPath, + new OpenFileParameters().withOptions(cpm1) + .withMandatoryKeys(new HashSet<>())).get()) { + byte[] bytesRead = new byte[dataSize]; + + in.read(1, bytesRead, 1, MB_4); + + // Verify that the data read is same as data written + Assertions.assertThat(Arrays.copyOfRange(bytesRead, 1, MB_4)) + .describedAs("Bytes read with checksum enabled are not as expected") + .containsExactly(Arrays.copyOfRange(bytesUploaded, 1, MB_4)); + } + } + + private void createFileWithData(Path path, byte[] data, AzureBlobFileSystem fs) throws Exception { + try (FSDataOutputStream out = fs.create(path)) { + out.write(data); + out.hflush(); + } + } + + private AzureBlobFileSystem getConfiguredFileSystem(final int writeBuffer, + final int readBuffer, final boolean readAheadEnabled) throws Exception { + AzureBlobFileSystem fs = createFileSystem(); + AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration(); + abfsConf.setIsChecksumValidationEnabled(true); + abfsConf.setWriteBufferSize(writeBuffer); + abfsConf.setReadBufferSize(readBuffer); + abfsConf.setReadAheadEnabled(readAheadEnabled); + return fs; + } + + public static byte[] generateRandomBytes(int numBytes) { + SecureRandom secureRandom = new SecureRandom(); + byte[] randomBytes = new byte[numBytes]; + secureRandom.nextBytes(randomBytes); + return randomBytes; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 11b14162eb..b27d92c319 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -170,7 +170,9 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely) getAbfsStore(fs).getAbfsConfiguration() .setReadSmallFilesCompletely(readSmallFilesCompletely); getAbfsStore(fs).getAbfsConfiguration() - .setOptimizeFooterRead(false); + .setOptimizeFooterRead(false); + getAbfsStore(fs).getAbfsConfiguration() + .setIsChecksumValidationEnabled(true); return fs; } @@ -179,6 +181,8 @@ private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, final AzureBlobFileSystem fs = getFileSystem(); getAbfsStore(fs).getAbfsConfiguration() .setOptimizeFooterRead(optimizeFooterRead); + getAbfsStore(fs).getAbfsConfiguration() + .setIsChecksumValidationEnabled(true); if (fileSize <= getAbfsStore(fs).getAbfsConfiguration() .getReadBufferSize()) { getAbfsStore(fs).getAbfsConfiguration()