HADOOP-17194. Adding Context class for AbfsClient in ABFS (#2216)
Contributed by Mehakmeet Singh.
This commit is contained in:
parent
41182a9b6d
commit
d1c60a53f6
@ -84,6 +84,8 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
@ -146,6 +148,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||
private final UserGroupInformation userGroupInformation;
|
||||
private final IdentityTransformerInterface identityTransformer;
|
||||
private final AbfsPerfTracker abfsPerfTracker;
|
||||
private final AbfsCounters abfsCounters;
|
||||
|
||||
/**
|
||||
* The set of directories where we should store files as append blobs.
|
||||
@ -192,7 +195,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||
boolean usingOauth = (authType == AuthType.OAuth);
|
||||
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
|
||||
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
||||
initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
|
||||
this.abfsCounters = abfsCounters;
|
||||
initializeClient(uri, fileSystemName, accountName, useHttps);
|
||||
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
|
||||
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
|
||||
IdentityTransformerInterface.class);
|
||||
@ -1213,8 +1217,19 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
||||
}
|
||||
|
||||
/**
|
||||
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
|
||||
* Operations.
|
||||
*
|
||||
* @param uri Uniform resource identifier for Abfs.
|
||||
* @param fileSystemName Name of the fileSystem being used.
|
||||
* @param accountName Name of the account being used to access Azure
|
||||
* data store.
|
||||
* @param isSecure Tells if https is being used or http.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void initializeClient(URI uri, String fileSystemName,
|
||||
String accountName, boolean isSecure, AbfsCounters abfsCounters)
|
||||
String accountName, boolean isSecure)
|
||||
throws IOException {
|
||||
if (this.client != null) {
|
||||
return;
|
||||
@ -1261,16 +1276,30 @@ public class AzureBlobFileSystemStore implements Closeable {
|
||||
LOG.trace("Initializing AbfsClient for {}", baseUrl);
|
||||
if (tokenProvider != null) {
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||
tokenProvider, abfsPerfTracker, abfsCounters);
|
||||
tokenProvider,
|
||||
populateAbfsClientContext());
|
||||
} else {
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||
sasTokenProvider, abfsPerfTracker, abfsCounters);
|
||||
sasTokenProvider,
|
||||
populateAbfsClientContext());
|
||||
}
|
||||
LOG.trace("AbfsClient init complete");
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate a new AbfsClientContext instance with the desired properties.
|
||||
*
|
||||
* @return an instance of AbfsClientContext.
|
||||
*/
|
||||
private AbfsClientContext populateAbfsClientContext() {
|
||||
return new AbfsClientContextBuilder()
|
||||
.withExponentialRetryPolicy(
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()))
|
||||
.withAbfsCounters(abfsCounters)
|
||||
.withAbfsPerfTracker(abfsPerfTracker)
|
||||
.build();
|
||||
}
|
||||
|
||||
private String getOctalNotation(FsPermission fsPermission) {
|
||||
Preconditions.checkNotNull(fsPermission, "fsPermission");
|
||||
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
|
||||
|
@ -77,15 +77,13 @@ public class AbfsClient implements Closeable {
|
||||
|
||||
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
final AbfsClientContext abfsClientContext) {
|
||||
this.baseUrl = baseUrl;
|
||||
this.sharedKeyCredentials = sharedKeyCredentials;
|
||||
String baseUrlString = baseUrl.toString();
|
||||
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
||||
this.abfsConfiguration = abfsConfiguration;
|
||||
this.retryPolicy = exponentialRetryPolicy;
|
||||
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
|
||||
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
||||
this.authType = abfsConfiguration.getAuthType(accountName);
|
||||
|
||||
@ -105,29 +103,23 @@ public class AbfsClient implements Closeable {
|
||||
}
|
||||
|
||||
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
this.abfsCounters = abfsCounters;
|
||||
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
|
||||
this.abfsCounters = abfsClientContext.getAbfsCounters();
|
||||
}
|
||||
|
||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final AccessTokenProvider tokenProvider,
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
||||
final AbfsClientContext abfsClientContext) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||
this.tokenProvider = tokenProvider;
|
||||
}
|
||||
|
||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final SASTokenProvider sasTokenProvider,
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
||||
final AbfsClientContext abfsClientContext) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||
this.sasTokenProvider = sasTokenProvider;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
/**
|
||||
* Class to hold extra configurations for AbfsClient and further classes
|
||||
* inside AbfsClient.
|
||||
*/
|
||||
public class AbfsClientContext {
|
||||
|
||||
private final ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private final AbfsPerfTracker abfsPerfTracker;
|
||||
private final AbfsCounters abfsCounters;
|
||||
|
||||
AbfsClientContext(
|
||||
ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
AbfsPerfTracker abfsPerfTracker,
|
||||
AbfsCounters abfsCounters) {
|
||||
this.exponentialRetryPolicy = exponentialRetryPolicy;
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
this.abfsCounters = abfsCounters;
|
||||
}
|
||||
|
||||
public ExponentialRetryPolicy getExponentialRetryPolicy() {
|
||||
return exponentialRetryPolicy;
|
||||
}
|
||||
|
||||
public AbfsPerfTracker getAbfsPerfTracker() {
|
||||
return abfsPerfTracker;
|
||||
}
|
||||
|
||||
public AbfsCounters getAbfsCounters() {
|
||||
return abfsCounters;
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
/**
|
||||
* A builder for AbfsClientContext class with different options to select and
|
||||
* build from.
|
||||
*/
|
||||
public class AbfsClientContextBuilder {
|
||||
|
||||
private ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private AbfsPerfTracker abfsPerfTracker;
|
||||
private AbfsCounters abfsCounters;
|
||||
|
||||
public AbfsClientContextBuilder withExponentialRetryPolicy(
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy) {
|
||||
this.exponentialRetryPolicy = exponentialRetryPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsClientContextBuilder withAbfsPerfTracker(
|
||||
final AbfsPerfTracker abfsPerfTracker) {
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters) {
|
||||
this.abfsCounters = abfsCounters;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the context and get the instance with the properties selected.
|
||||
*
|
||||
* @return an instance of AbfsClientContext.
|
||||
*/
|
||||
public AbfsClientContext build() {
|
||||
//validate the values
|
||||
return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
|
||||
abfsCounters);
|
||||
}
|
||||
}
|
@ -103,8 +103,9 @@ public final class TestAbfsClient {
|
||||
|
||||
private String getUserAgentString(AbfsConfiguration config,
|
||||
boolean includeSSLProvider) throws MalformedURLException {
|
||||
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
|
||||
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
||||
config, null, (AccessTokenProvider) null, null, null);
|
||||
config, (AccessTokenProvider) null, abfsClientContext);
|
||||
String sslProviderName = null;
|
||||
if (includeSSLProvider) {
|
||||
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
|
||||
@ -257,6 +258,12 @@ public final class TestAbfsClient {
|
||||
abfsConfig.getAccountName(),
|
||||
abfsConfig);
|
||||
|
||||
AbfsClientContext abfsClientContext =
|
||||
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
|
||||
.withExponentialRetryPolicy(
|
||||
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
|
||||
.build();
|
||||
|
||||
// Create test AbfsClient
|
||||
AbfsClient testClient = new AbfsClient(
|
||||
baseAbfsClientInstance.getBaseUrl(),
|
||||
@ -267,11 +274,10 @@ public final class TestAbfsClient {
|
||||
abfsConfig.getStorageAccountKey())
|
||||
: null),
|
||||
abfsConfig,
|
||||
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()),
|
||||
(currentAuthType == AuthType.OAuth
|
||||
? abfsConfig.getTokenProvider()
|
||||
: null),
|
||||
tracker, null);
|
||||
abfsClientContext);
|
||||
|
||||
return testClient;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user