HADOOP-18684. S3A filesystem to support binding to to other URI schemes (#5521)
Contributed by Harshit Gupta
This commit is contained in:
parent
3400e8257e
commit
42ed2b9075
@ -18,14 +18,16 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.DelegateToFileSystem;
|
import org.apache.hadoop.fs.DelegateToFileSystem;
|
||||||
|
|
||||||
import java.io.IOException;
|
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* S3A implementation of AbstractFileSystem.
|
* S3A implementation of AbstractFileSystem.
|
||||||
@ -37,7 +39,8 @@ public class S3A extends DelegateToFileSystem {
|
|||||||
|
|
||||||
public S3A(URI theUri, Configuration conf)
|
public S3A(URI theUri, Configuration conf)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
super(theUri, new S3AFileSystem(), conf, "s3a", false);
|
super(theUri, new S3AFileSystem(), conf,
|
||||||
|
theUri.getScheme().isEmpty() ? FS_S3A : theUri.getScheme(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -404,6 +404,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
*/
|
*/
|
||||||
private final Set<Path> deleteOnExit = new TreeSet<>();
|
private final Set<Path> deleteOnExit = new TreeSet<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scheme for the current filesystem.
|
||||||
|
*/
|
||||||
|
private String scheme = FS_S3A;
|
||||||
|
|
||||||
/** Add any deprecated keys. */
|
/** Add any deprecated keys. */
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private static void addDeprecatedKeys() {
|
private static void addDeprecatedKeys() {
|
||||||
@ -617,6 +622,7 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
vectoredActiveRangeReads = intOption(conf,
|
vectoredActiveRangeReads = intOption(conf,
|
||||||
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
|
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
|
||||||
vectoredIOContext = populateVectoredIOContext(conf);
|
vectoredIOContext = populateVectoredIOContext(conf);
|
||||||
|
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
// amazon client exception: stop all services then throw the translation
|
// amazon client exception: stop all services then throw the translation
|
||||||
cleanupWithLogger(LOG, span);
|
cleanupWithLogger(LOG, span);
|
||||||
@ -1162,7 +1168,7 @@ public void abortOutstandingMultipartUploads(long seconds)
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String getScheme() {
|
public String getScheme() {
|
||||||
return "s3a";
|
return this.scheme;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,51 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public class ITestS3AUrlScheme extends AbstractS3ATestBase{
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFSScheme() throws IOException, URISyntaxException {
|
||||||
|
FileSystem fs = FileSystem.get(new URI("s3://mybucket/path"),
|
||||||
|
getConfiguration());
|
||||||
|
try {
|
||||||
|
assertEquals("s3", fs.getScheme());
|
||||||
|
Path path = fs.makeQualified(new Path("tmp/path"));
|
||||||
|
assertEquals("s3", path.toUri().getScheme());
|
||||||
|
} finally {
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -13,11 +13,34 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs.s3a.fileContext;
|
package org.apache.hadoop.fs.s3a.fileContext;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.TestFileContext;
|
import org.apache.hadoop.fs.TestFileContext;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of TestFileContext for S3a.
|
* Implementation of TestFileContext for S3a.
|
||||||
*/
|
*/
|
||||||
public class ITestS3AFileContext extends TestFileContext{
|
public class ITestS3AFileContext extends TestFileContext {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheme()
|
||||||
|
throws URISyntaxException, UnsupportedFileSystemException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI uri = new URI("s3://mybucket/path");
|
||||||
|
conf.set("fs.AbstractFileSystem.s3.impl",
|
||||||
|
"org.apache.hadoop.fs.s3a.S3A");
|
||||||
|
FileContext fc = FileContext.getFileContext(uri, conf);
|
||||||
|
assertEquals("s3", fc.getDefaultFileSystem().getUri().getScheme());
|
||||||
|
Path path = fc.makeQualified(new Path("tmp/path"));
|
||||||
|
assertEquals("s3", path.toUri().getScheme());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user