HADOOP-14518. Customize User-Agent header sent in HTTP/HTTPS requests by WASB. Contributed by Georgi Chalakov.

This commit is contained in:
Jitendra Pandey 2017-07-24 13:59:27 -07:00
parent c98201b5d8
commit f2921e51f0
6 changed files with 162 additions and 59 deletions

View File

@ -499,7 +499,15 @@
name to use for the service when the client wishes to make an RPC call.
</description>
</property>
<property>
<name>fs.azure.user.agent.prefix</name>
<value>unknown</value>
<description>
WASB passes User-Agent header to the Azure back-end. The default value
contains WASB version, Java Runtime version, Azure Client library version,
and the value of the configuration option fs.azure.user.agent.prefix.
</description>
</property>
<property>
<name>hadoop.security.uid.cache.secs</name>

View File

@ -116,6 +116,7 @@ public void initializeMemberVariables() {
xmlPropsToSkipCompare.add("fs.azure.secure.mode");
xmlPropsToSkipCompare.add("fs.azure.authorization");
xmlPropsToSkipCompare.add("fs.azure.authorization.caching.enable");
xmlPropsToSkipCompare.add("fs.azure.user.agent.prefix");
// Deprecated properties. These should eventually be removed from the
// class.

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.VersionInfo;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,6 +72,10 @@
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.StorageEvent;
import com.microsoft.azure.storage.core.BaseRequest;
import com.microsoft.azure.storage.SendingRequestEvent;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
@ -83,13 +88,13 @@
/**
* Core implementation of Windows Azure Filesystem for Hadoop.
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
*
*/
@InterfaceAudience.Private
@VisibleForTesting
public class AzureNativeFileSystemStore implements NativeFileSystemStore {
/**
* Configuration knob on whether we do block-level MD5 validation on
* upload/download.
@ -102,6 +107,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
static final String DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME = "storageemulator";
static final String STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME = "fs.azure.storage.emulator.account.name";
/**
* Configuration for User-Agent field.
*/
static final String USER_AGENT_ID_KEY = "fs.azure.user.agent.prefix";
static final String USER_AGENT_ID_DEFAULT = "unknown";
public static final Logger LOG = LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
private StorageInterface storageInteractionLayer;
@ -133,15 +144,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final String KEY_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
private static final String KEY_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
private static final String KEY_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
private static final String KEY_COPYBLOB_MIN_BACKOFF_INTERVAL =
private static final String KEY_COPYBLOB_MIN_BACKOFF_INTERVAL =
"fs.azure.io.copyblob.retry.min.backoff.interval";
private static final String KEY_COPYBLOB_MAX_BACKOFF_INTERVAL =
private static final String KEY_COPYBLOB_MAX_BACKOFF_INTERVAL =
"fs.azure.io.copyblob.retry.max.backoff.interval";
private static final String KEY_COPYBLOB_BACKOFF_INTERVAL =
private static final String KEY_COPYBLOB_BACKOFF_INTERVAL =
"fs.azure.io.copyblob.retry.backoff.interval";
private static final String KEY_COPYBLOB_MAX_IO_RETRIES =
"fs.azure.io.copyblob.retry.max.retries";
private static final String KEY_COPYBLOB_MAX_IO_RETRIES =
"fs.azure.io.copyblob.retry.max.retries";
private static final String KEY_SELF_THROTTLE_ENABLE = "fs.azure.selfthrottling.enable";
private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
@ -188,7 +199,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* The set of directories where we should store files as page blobs.
*/
private Set<String> pageBlobDirs;
/**
* Configuration key to indicate the set of directories in WASB where
* we should do atomic folder rename synchronized with createNonRecursive.
@ -232,11 +243,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000;
private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000;
private static final int DEFAULT_COPYBLOB_BACKOFF_INTERVAL = 30 * 1000;
private static final int DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS = 15;
private static final int DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS = 15;
// Self-throttling defaults. Allowed range = (0,1.0]
// Value of 1.0 means no self-throttling.
@ -306,6 +317,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean useSecureMode = false;
private boolean useLocalSasKeyMode = false;
// User-Agent
private String userAgentId;
private String delegationToken;
/** The error message template when container is not accessible. */
@ -319,7 +333,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* A test hook interface that can modify the operation context we use for
* Azure Storage operations, e.g. to inject errors.
*/
@VisibleForTesting
@VisibleForTesting
interface TestHookOperationContext {
OperationContext modifyOperationContext(OperationContext original);
}
@ -336,11 +350,11 @@ void suppressRetryPolicy() {
/**
* Add a test hook to modify the operation context we use for Azure Storage
* operations.
*
*
* @param testHook
* The test hook, or null to unset previous hooks.
*/
@VisibleForTesting
@VisibleForTesting
void addTestHookToOperationContext(TestHookOperationContext testHook) {
this.testHookOperationContext = testHook;
}
@ -358,7 +372,7 @@ private void suppressRetryPolicyInClientIfNeeded() {
/**
* Creates a JSON serializer that can serialize a PermissionStatus object into
* the JSON string we want in the blob metadata.
*
*
* @return The JSON serializer.
*/
private static JSON createPermissionJsonSerializer() {
@ -425,7 +439,7 @@ public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
/**
* Check if concurrent reads and writes on the same blob are allowed.
*
*
* @return true if concurrent reads and OOB writes has been configured, false
* otherwise.
*/
@ -437,11 +451,11 @@ private boolean isConcurrentOOBAppendAllowed() {
* Method for the URI and configuration object necessary to create a storage
* session with an Azure session. It parses the scheme to ensure it matches
* the storage protocol supported by this file system.
*
*
* @param uri - URI for target storage blob.
* @param conf - reference to configuration object.
* @param instrumentation - the metrics source that will keep track of operations here.
*
*
* @throws IllegalArgumentException if URI or job object is null, or invalid scheme.
*/
@Override
@ -504,6 +518,9 @@ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentati
pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES);
LOG.debug("Page blob directories: {}", setToString(pageBlobDirs));
// User-agent
userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT);
// Extract directories that should have atomic rename applied.
atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
String hbaseRoot;
@ -539,7 +556,7 @@ private String setToString(Set<String> set) {
/**
* Method to extract the account name from an Azure URI.
*
*
* @param uri
* -- WASB blob URI
* @returns accountName -- the account name for the URI.
@ -590,7 +607,7 @@ private String getAccountFromAuthority(URI uri) throws URISyntaxException {
/**
* Method to extract the container name from an Azure URI.
*
*
* @param uri
* -- WASB blob URI
* @returns containerName -- the container name for the URI. May be null.
@ -641,7 +658,7 @@ private String getContainerFromAuthority(URI uri) throws URISyntaxException {
/**
* Get the appropriate return the appropriate scheme for communicating with
* Azure depending on whether wasb or wasbs is specified in the target URI.
*
*
* return scheme - HTTPS or HTTP as appropriate.
*/
private String getHTTPScheme() {
@ -663,7 +680,7 @@ private String getHTTPScheme() {
/**
* Set the configuration parameters for this client storage session with
* Azure.
*
*
* @throws AzureException
*/
private void configureAzureStorageSession() throws AzureException {
@ -763,10 +780,10 @@ private void configureAzureStorageSession() throws AzureException {
/**
* Connect to Azure storage using anonymous credentials.
*
*
* @param uri
* - URI to target blob (R/O access to public blob)
*
*
* @throws StorageException
* raised on errors communicating with Azure storage.
* @throws IOException
@ -893,7 +910,7 @@ private boolean isStorageEmulatorAccount(final String accountName) {
STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME,
DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME));
}
@VisibleForTesting
public static String getAccountKeyFromConfiguration(String accountName,
Configuration conf) throws KeyProviderException {
@ -930,7 +947,7 @@ public static String getAccountKeyFromConfiguration(String accountName,
* Establish a session with Azure blob storage based on the target URI. The
* method determines whether or not the URI target contains an explicit
* account or an implicit default cluster-wide account.
*
*
* @throws AzureException
* @throws IOException
*/
@ -983,7 +1000,7 @@ private void createAzureStorageSession ()
instrumentation.setAccountName(accountName);
String containerName = getContainerFromAuthority(sessionUri);
instrumentation.setContainerName(containerName);
// Check whether this is a storage emulator account.
if (isStorageEmulatorAccount(accountName)) {
// It is an emulator account, connect to it with no credentials.
@ -1086,7 +1103,7 @@ private static String trim(String s, String toTrim) {
*/
private String verifyAndConvertToStandardFormat(String rawDir) throws URISyntaxException {
URI asUri = new URI(rawDir);
if (asUri.getAuthority() == null
if (asUri.getAuthority() == null
|| asUri.getAuthority().toLowerCase(Locale.ENGLISH).equalsIgnoreCase(
sessionUri.getAuthority().toLowerCase(Locale.ENGLISH))) {
// Applies to me.
@ -1167,8 +1184,8 @@ public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
return false;
}
/**
* This should be called from any method that does any modifications to the
* underlying container: it makes sure to put the WASB current version in the
@ -1364,11 +1381,11 @@ public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
* could mean either:
* (1) container=mycontainer; blob=myblob.txt
* (2) container=$root; blob=mycontainer/myblob.txt
*
*
* To avoid this type of ambiguity the Azure blob storage prevents
* arbitrary path under $root. For a simple and more consistent user
* experience it was decided to eliminate the opportunity for creating
* such paths by making the $root container read-only under WASB.
* such paths by making the $root container read-only under WASB.
*/
// Check that no attempt is made to write to blobs on default
@ -1445,7 +1462,7 @@ private InputStream openInputStream(CloudBlobWrapper blob)
/**
* Default permission to use when no permission metadata is found.
*
*
* @return The default permission to use.
*/
private static PermissionStatus defaultPermissionNoBlobMetadata() {
@ -1688,7 +1705,7 @@ public String getLinkInFileMetadata(String key) throws AzureException {
/**
* Private method to check for authenticated access.
*
*
* @ returns boolean -- true if access is credentialed and authenticated and
* false otherwise.
*/
@ -1708,7 +1725,7 @@ private boolean isAuthenticatedAccess() throws AzureException {
* original file system object was constructed with a short- or long-form URI.
* If the root directory is non-null the URI in the file constructor was in
* the long form.
*
*
* @param includeMetadata
* if set, the listed items will have their metadata populated
* already.
@ -1717,7 +1734,7 @@ private boolean isAuthenticatedAccess() throws AzureException {
*
* @returns blobItems : iterable collection of blob items.
* @throws URISyntaxException
*
*
*/
private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
boolean useFlatBlobListing) throws StorageException, URISyntaxException {
@ -1736,7 +1753,7 @@ private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
* the directory depending on whether the original file system object was
* constructed with a short- or long-form URI. If the root directory is
* non-null the URI in the file constructor was in the long form.
*
*
* @param aPrefix
* : string name representing the prefix of containing blobs.
* @param includeMetadata
@ -1744,10 +1761,10 @@ private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
* already.
* @param useFlatBlobListing
* if set the list is flat, otherwise it is hierarchical.
*
*
* @returns blobItems : iterable collection of blob items.
* @throws URISyntaxException
*
*
*/
private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean includeMetadata,
boolean useFlatBlobListing) throws StorageException, URISyntaxException {
@ -1769,7 +1786,7 @@ private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean includeMeta
* constructed with a short- or long-form URI. It also uses the specified flat
* or hierarchical option, listing details options, request options, and
* operation context.
*
*
* @param aPrefix
* string name representing the prefix of containing blobs.
* @param useFlatBlobListing
@ -1784,7 +1801,7 @@ private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean includeMeta
* - context of the current operation
* @returns blobItems : iterable collection of blob items.
* @throws URISyntaxException
*
*
*/
private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean useFlatBlobListing,
EnumSet<BlobListingDetails> listingDetails, BlobRequestOptions options,
@ -1804,13 +1821,13 @@ private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean useFlatBlob
* get the block blob reference depending on whether the original file system
* object was constructed with a short- or long-form URI. If the root
* directory is non-null the URI in the file constructor was in the long form.
*
*
* @param aKey
* : a key used to query Azure for the block blob.
* @returns blob : a reference to the Azure block blob corresponding to the
* key.
* @throws URISyntaxException
*
*
*/
private CloudBlobWrapper getBlobReference(String aKey)
throws StorageException, URISyntaxException {
@ -1831,10 +1848,10 @@ private CloudBlobWrapper getBlobReference(String aKey)
* This private method normalizes the key by stripping the container name from
* the path and returns a path relative to the root directory of the
* container.
*
*
* @param keyUri
* - adjust this key to a path relative to the root directory
*
*
* @returns normKey
*/
private String normalizeKey(URI keyUri) {
@ -1853,11 +1870,11 @@ private String normalizeKey(URI keyUri) {
* This private method normalizes the key by stripping the container name from
* the path and returns a path relative to the root directory of the
* container.
*
*
* @param blob
* - adjust the key to this blob to a path relative to the root
* directory
*
*
* @returns normKey
*/
private String normalizeKey(CloudBlobWrapper blob) {
@ -1868,11 +1885,11 @@ private String normalizeKey(CloudBlobWrapper blob) {
* This private method normalizes the key by stripping the container name from
* the path and returns a path relative to the root directory of the
* container.
*
*
* @param directory
* - adjust the key to this directory to a path relative to the root
* directory
*
*
* @returns normKey
*/
private String normalizeKey(CloudBlobDirectoryWrapper directory) {
@ -1889,7 +1906,7 @@ private String normalizeKey(CloudBlobDirectoryWrapper directory) {
* operation that has listeners hooked to it that will update the metrics for
* this file system. This method does not bind to receive send request
* callbacks by default.
*
*
* @return The OperationContext object to use.
*/
private OperationContext getInstrumentedContext() {
@ -1900,16 +1917,27 @@ private OperationContext getInstrumentedContext() {
/**
* Creates a new OperationContext for the Azure Storage operation that has
* listeners hooked to it that will update the metrics for this file system.
*
*
* @param bindConcurrentOOBIo
* - bind to intercept send request call backs to handle OOB I/O.
*
*
* @return The OperationContext object to use.
*/
private OperationContext getInstrumentedContext(boolean bindConcurrentOOBIo) {
OperationContext operationContext = new OperationContext();
// Set User-Agent
operationContext.getSendingRequestEventHandler().addListener(new StorageEvent<SendingRequestEvent>() {
@Override
public void eventOccurred(SendingRequestEvent eventArg) {
HttpURLConnection connection = (HttpURLConnection) eventArg.getConnectionObject();
String userAgentInfo = String.format(Utility.LOCALE_US, "WASB/%s (%s) %s",
VersionInfo.getVersion(), userAgentId, BaseRequest.getUserAgent());
connection.setRequestProperty(Constants.HeaderConstants.USER_AGENT, userAgentInfo);
}
});
if (selfThrottlingEnabled) {
SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
selfThrottlingWriteFactor);
@ -2096,7 +2124,7 @@ public PartialListing listAll(String prefix, final int maxListingCount,
/**
* Searches the given list of {@link FileMetadata} objects for a directory
* with the given key.
*
*
* @param list
* The list to search.
* @param key
@ -2229,7 +2257,7 @@ private PartialListing list(String prefix, String delimiter,
* Build up a metadata list of blobs in an Azure blob directory. This method
* uses a in-order first traversal of blob directory structures to maintain
* the sorted order of the blob names.
*
*
* @param aCloudBlobDirectory Azure blob directory
* @param aFileMetadataList a list of file metadata objects for each
* non-directory blob.
@ -2564,7 +2592,7 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
//
// Copy blob operation in Azure storage is very costly. It will be highly
// likely throttled during Azure storage gc. Short term fix will be using
// a more intensive exponential retry policy when the cluster is getting
// a more intensive exponential retry policy when the cluster is getting
// throttled.
try {
dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
@ -2585,10 +2613,10 @@ public void rename(String srcKey, String dstKey, boolean acquireLease,
int copyBlobMaxRetries = sessionConfiguration.getInt(
KEY_COPYBLOB_MAX_IO_RETRIES,
DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
BlobRequestOptions options = new BlobRequestOptions();
options.setRetryPolicyFactory(new RetryExponentialRetry(
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMaxRetries));
dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
} else {
@ -2794,7 +2822,7 @@ public void close() {
bandwidthGaugeUpdater = null;
}
}
// Finalizer to ensure complete shutdown
@Override
protected void finalize() throws Throwable {

View File

@ -192,6 +192,19 @@ The configuration option `fs.azure.page.blob.extension.size` is the page blob
extension size. This defines the amount to extend a page blob if it starts to
get full. It must be 128MB or greater, specified as an integer number of bytes.
### Custom User-Agent
WASB passes User-Agent header to the Azure back-end. The default value
contains WASB version, Java Runtime version, Azure Client library version, and the
value of the configuration option `fs.azure.user.agent.prefix`. Customized User-Agent
header enables better troubleshooting and analysis by Azure service.
```xml
<property>
<name>fs.azure.user.agent.prefix</name>
<value>Identifier</value>
</property>
```
### Atomic Folder Rename
Azure storage stores files as a flat key/value store without formal support

View File

@ -566,4 +566,52 @@ void excludeAndTestExpectations(Configuration config, String newPath)
CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, null);
assertEquals(newPath, effectivePath);
}
@Test
public void testUserAgentConfig() throws Exception {
// Set the user agent
try {
testAccount = AzureBlobStorageTestAccount.createMock();
Configuration conf = testAccount.getFileSystem().getConf();
String authority = testAccount.getFileSystem().getUri().getAuthority();
URI defaultUri = new URI("wasbs", authority, null, null, null);
conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
conf.set(AzureNativeFileSystemStore.USER_AGENT_ID_KEY, "TestClient");
FileSystem fs = FileSystem.get(conf);
AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
assertTrue(afs instanceof Wasbs);
assertEquals(-1, afs.getUri().getPort());
assertEquals("wasbs", afs.getUri().getScheme());
} finally {
testAccount.cleanup();
FileSystem.closeAll();
}
// Unset the user agent
try {
testAccount = AzureBlobStorageTestAccount.createMock();
Configuration conf = testAccount.getFileSystem().getConf();
String authority = testAccount.getFileSystem().getUri().getAuthority();
URI defaultUri = new URI("wasbs", authority, null, null, null);
conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
conf.unset(AzureNativeFileSystemStore.USER_AGENT_ID_KEY);
FileSystem fs = FileSystem.get(conf);
AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
assertTrue(afs instanceof Wasbs);
assertEquals(-1, afs.getUri().getPort());
assertEquals("wasbs", afs.getUri().getScheme());
} finally {
testAccount.cleanup();
FileSystem.closeAll();
}
}
}

View File

@ -34,6 +34,11 @@
<value>true</value>
</property>
<property>
<name>fs.azure.user.agent.prefix</name>
<value>MSFT</value>
</property>
<!-- Save the above configuration properties in a separate file named -->
<!-- azure-auth-keys.xml in the same directory as this file. -->
<!-- DO NOT ADD azure-auth-keys.xml TO REVISION CONTROL. The keys to your -->