HADOOP-18869: [ABFS] Fix behavior of a File System APIs on root path (#6003)
Contributed by Anmol Asrani
This commit is contained in:
parent
9c621fcea7
commit
6c6df40d35
@ -110,6 +110,7 @@
|
||||
import org.apache.hadoop.util.LambdaUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_CONFLICT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
|
||||
@ -120,6 +121,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
|
||||
@ -324,6 +326,12 @@ public FSDataOutputStream create(final Path f,
|
||||
|
||||
statIncrement(CALL_CREATE);
|
||||
trailingPeriodCheck(f);
|
||||
if (f.isRoot()) {
|
||||
throw new AbfsRestOperationException(HTTP_CONFLICT,
|
||||
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
|
||||
ERR_CREATE_ON_ROOT,
|
||||
null);
|
||||
}
|
||||
|
||||
Path qualifiedPath = makeQualified(f);
|
||||
|
||||
@ -348,6 +356,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
|
||||
final Progressable progress) throws IOException {
|
||||
|
||||
statIncrement(CALL_CREATE_NON_RECURSIVE);
|
||||
if (f.isRoot()) {
|
||||
throw new AbfsRestOperationException(HTTP_CONFLICT,
|
||||
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
|
||||
ERR_CREATE_ON_ROOT,
|
||||
null);
|
||||
}
|
||||
final Path parent = f.getParent();
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
|
||||
@ -965,15 +979,25 @@ public void setXAttr(final Path path,
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.SET_ATTR, true, tracingHeaderFormat,
|
||||
listener);
|
||||
Hashtable<String, String> properties = abfsStore
|
||||
.getPathStatus(qualifiedPath, tracingContext);
|
||||
Hashtable<String, String> properties;
|
||||
String xAttrName = ensureValidAttributeName(name);
|
||||
|
||||
if (path.isRoot()) {
|
||||
properties = abfsStore.getFilesystemProperties(tracingContext);
|
||||
} else {
|
||||
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
|
||||
}
|
||||
|
||||
boolean xAttrExists = properties.containsKey(xAttrName);
|
||||
XAttrSetFlag.validate(name, xAttrExists, flag);
|
||||
|
||||
String xAttrValue = abfsStore.decodeAttribute(value);
|
||||
properties.put(xAttrName, xAttrValue);
|
||||
if (path.isRoot()) {
|
||||
abfsStore.setFilesystemProperties(properties, tracingContext);
|
||||
} else {
|
||||
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
|
||||
}
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
checkException(path, ex);
|
||||
}
|
||||
@ -1005,9 +1029,15 @@ public byte[] getXAttr(final Path path, final String name)
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.GET_ATTR, true, tracingHeaderFormat,
|
||||
listener);
|
||||
Hashtable<String, String> properties = abfsStore
|
||||
.getPathStatus(qualifiedPath, tracingContext);
|
||||
Hashtable<String, String> properties;
|
||||
String xAttrName = ensureValidAttributeName(name);
|
||||
|
||||
if (path.isRoot()) {
|
||||
properties = abfsStore.getFilesystemProperties(tracingContext);
|
||||
} else {
|
||||
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
|
||||
}
|
||||
|
||||
if (properties.containsKey(xAttrName)) {
|
||||
String xAttrValue = properties.get(xAttrName);
|
||||
value = abfsStore.encodeAttribute(xAttrValue);
|
||||
@ -1488,7 +1518,7 @@ public static void checkException(final Path path,
|
||||
case HttpURLConnection.HTTP_NOT_FOUND:
|
||||
throw (IOException) new FileNotFoundException(message)
|
||||
.initCause(exception);
|
||||
case HttpURLConnection.HTTP_CONFLICT:
|
||||
case HTTP_CONFLICT:
|
||||
throw (IOException) new FileAlreadyExistsException(message)
|
||||
.initCause(exception);
|
||||
case HttpURLConnection.HTTP_FORBIDDEN:
|
||||
|
@ -494,7 +494,7 @@ public void setPathProperties(final Path path,
|
||||
final Hashtable<String, String> properties, TracingContext tracingContext)
|
||||
throws AzureBlobFileSystemException {
|
||||
try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){
|
||||
LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}",
|
||||
LOG.debug("setPathProperties for filesystem: {} path: {} with properties: {}",
|
||||
client.getFileSystem(),
|
||||
path,
|
||||
properties);
|
||||
|
@ -48,6 +48,6 @@ public final class AbfsErrors {
|
||||
+ "operation";
|
||||
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
|
||||
+ "configured, set " + FS_AZURE_LEASE_THREADS;
|
||||
|
||||
public static final String ERR_CREATE_ON_ROOT = "Cannot create file over root path";
|
||||
private AbfsErrors() {}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.junit.Assume;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -46,37 +46,14 @@ public ITestAzureBlobFileSystemAttributes() throws Exception {
|
||||
public void testSetGetXAttr() throws Exception {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
|
||||
Assume.assumeTrue(getIsNamespaceEnabled(fs));
|
||||
|
||||
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
|
||||
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
|
||||
String attributeName1 = "user.asciiAttribute";
|
||||
String attributeName2 = "user.unicodeAttribute";
|
||||
Path testFile = path("setGetXAttr");
|
||||
|
||||
// after creating a file, the xAttr should not be present
|
||||
touch(testFile);
|
||||
assertNull(fs.getXAttr(testFile, attributeName1));
|
||||
|
||||
// after setting the xAttr on the file, the value should be retrievable
|
||||
fs.registerListener(
|
||||
new TracingHeaderValidator(conf.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
|
||||
fs.setXAttr(testFile, attributeName1, attributeValue1);
|
||||
fs.setListenerOperation(FSOperationType.GET_ATTR);
|
||||
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
|
||||
fs.registerListener(null);
|
||||
|
||||
// after setting a second xAttr on the file, the first xAttr values should not be overwritten
|
||||
fs.setXAttr(testFile, attributeName2, attributeValue2);
|
||||
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
|
||||
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
|
||||
final Path testPath = path("setGetXAttr");
|
||||
fs.create(testPath);
|
||||
testGetSetXAttrHelper(fs, testPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGetXAttrCreateReplace() throws Exception {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(getIsNamespaceEnabled(fs));
|
||||
byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
|
||||
String attributeName = "user.someAttribute";
|
||||
Path testFile = path("createReplaceXAttr");
|
||||
@ -84,7 +61,9 @@ public void testSetGetXAttrCreateReplace() throws Exception {
|
||||
// after creating a file, it must be possible to create a new xAttr
|
||||
touch(testFile);
|
||||
fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
|
||||
assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
|
||||
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
|
||||
.describedAs("Retrieved Attribute Value is Not as Expected")
|
||||
.containsExactly(attributeValue);
|
||||
|
||||
// however after the xAttr is created, creating it again must fail
|
||||
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG));
|
||||
@ -93,7 +72,6 @@ public void testSetGetXAttrCreateReplace() throws Exception {
|
||||
@Test
|
||||
public void testSetGetXAttrReplace() throws Exception {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(getIsNamespaceEnabled(fs));
|
||||
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
|
||||
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
|
||||
String attributeName = "user.someAttribute";
|
||||
@ -108,6 +86,72 @@ public void testSetGetXAttrReplace() throws Exception {
|
||||
// however after the xAttr is created, replacing it must succeed
|
||||
fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
|
||||
fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
|
||||
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
|
||||
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
|
||||
.describedAs("Retrieved Attribute Value is Not as Expected")
|
||||
.containsExactly(attributeValue2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSetXAttrOnRoot() throws Exception {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
final Path testPath = new Path("/");
|
||||
testGetSetXAttrHelper(fs, testPath);
|
||||
}
|
||||
|
||||
private void testGetSetXAttrHelper(final AzureBlobFileSystem fs,
|
||||
final Path testPath) throws Exception {
|
||||
|
||||
String attributeName1 = "user.attribute1";
|
||||
String attributeName2 = "user.attribute2";
|
||||
String decodedAttributeValue1 = "hi";
|
||||
String decodedAttributeValue2 = "hello";
|
||||
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue1);
|
||||
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue2);
|
||||
|
||||
// Attribute not present initially
|
||||
Assertions.assertThat(fs.getXAttr(testPath, attributeName1))
|
||||
.describedAs("Cannot get attribute before setting it")
|
||||
.isNull();
|
||||
Assertions.assertThat(fs.getXAttr(testPath, attributeName2))
|
||||
.describedAs("Cannot get attribute before setting it")
|
||||
.isNull();
|
||||
|
||||
// Set the Attributes
|
||||
fs.registerListener(
|
||||
new TracingHeaderValidator(fs.getAbfsStore().getAbfsConfiguration()
|
||||
.getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
|
||||
fs.setXAttr(testPath, attributeName1, attributeValue1);
|
||||
|
||||
// Check if the attribute is retrievable
|
||||
fs.setListenerOperation(FSOperationType.GET_ATTR);
|
||||
byte[] rv = fs.getXAttr(testPath, attributeName1);
|
||||
Assertions.assertThat(rv)
|
||||
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
|
||||
.containsExactly(attributeValue1);
|
||||
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
|
||||
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
|
||||
.isEqualTo(decodedAttributeValue1);
|
||||
fs.registerListener(null);
|
||||
|
||||
// Set the second Attribute
|
||||
fs.setXAttr(testPath, attributeName2, attributeValue2);
|
||||
|
||||
// Check all the attributes present and previous Attribute not overridden
|
||||
rv = fs.getXAttr(testPath, attributeName1);
|
||||
Assertions.assertThat(rv)
|
||||
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
|
||||
.containsExactly(attributeValue1);
|
||||
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
|
||||
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
|
||||
.isEqualTo(decodedAttributeValue1);
|
||||
|
||||
rv = fs.getXAttr(testPath, attributeName2);
|
||||
Assertions.assertThat(rv)
|
||||
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
|
||||
.containsExactly(attributeValue2);
|
||||
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
|
||||
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
|
||||
.isEqualTo(decodedAttributeValue2);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.EnumSet;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -33,6 +34,7 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -146,6 +148,26 @@ public void testCreateNonRecursive2() throws Exception {
|
||||
assertIsFile(fs, testFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateOnRoot() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testFile = path(AbfsHttpConstants.ROOT_PATH);
|
||||
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () ->
|
||||
fs.create(testFile, true));
|
||||
if (ex.getStatusCode() != HTTP_CONFLICT) {
|
||||
// Request should fail with 409.
|
||||
throw ex;
|
||||
}
|
||||
|
||||
ex = intercept(AbfsRestOperationException.class, () ->
|
||||
fs.createNonRecursive(testFile, FsPermission.getDefault(),
|
||||
false, 1024, (short) 1, 1024, null));
|
||||
if (ex.getStatusCode() != HTTP_CONFLICT) {
|
||||
// Request should fail with 409.
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to use to the ABFS stream after it is closed.
|
||||
*/
|
||||
@ -190,7 +212,8 @@ public void testTryWithResources() throws Throwable {
|
||||
// the exception raised in close() must be in the caught exception's
|
||||
// suppressed list
|
||||
Throwable[] suppressed = fnfe.getSuppressed();
|
||||
assertEquals("suppressed count", 1, suppressed.length);
|
||||
Assertions.assertThat(suppressed.length)
|
||||
.describedAs("suppressed count should be 1").isEqualTo(1);
|
||||
Throwable inner = suppressed[0];
|
||||
if (!(inner instanceof IOException)) {
|
||||
throw inner;
|
||||
|
@ -19,7 +19,7 @@
|
||||
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
|
||||
<property>
|
||||
<name>fs.contract.test.root-tests-enabled</name>
|
||||
<value>false</value>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
|
Loading…
Reference in New Issue
Block a user