HADOOP-16520. Race condition in DDB table init and waiting threads. (#1576). Contributed by Gabor Bota.

Fixes HADOOP-16349. DynamoDBMetadataStore.getVersionMarkerItem() to log at info/warn on retry

Change-Id: Ia83e92b9039ccb780090c99c41b4f71ef7539d35
This commit is contained in:
Gabor Bota 2019-10-11 12:08:47 +02:00 committed by GitHub
parent f267917ce3
commit 4a700c20d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 892 additions and 486 deletions

View File

@ -176,7 +176,7 @@ private Constants() {
// number of times we should retry errors
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
// seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a.s3guard;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
@ -28,7 +27,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -43,9 +41,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkBaseException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
@ -62,17 +58,9 @@
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.amazonaws.waiters.WaiterTimedOutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -89,7 +77,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
import org.apache.hadoop.fs.s3a.Constants;
@ -233,19 +220,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
OPERATIONS_LOG_NAME);
/** parent/child name to use in the version marker. */
public static final String VERSION_MARKER = "../VERSION";
public static final String VERSION_MARKER_ITEM_NAME = "../VERSION";
/** parent/child name to use in the version marker. */
public static final String VERSION_MARKER_TAG_NAME = "s3guard_version";
/** Current version number. */
public static final int VERSION = 100;
/** Error: version marker not found in table. */
public static final String E_NO_VERSION_MARKER
= "S3Guard table lacks version marker.";
/** Error: version mismatch. */
public static final String E_INCOMPATIBLE_VERSION
= "Database table is from an incompatible S3Guard version.";
@VisibleForTesting
static final String BILLING_MODE
= "billing-mode";
@ -305,14 +287,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
private String region;
private Table table;
private String tableName;
private String tableArn;
private Configuration conf;
private String username;
/**
* This policy is mostly for batched writes, not for processing
* exceptions in invoke() calls.
* It also has a role purpose in {@link #getVersionMarkerItem()};
* It also has a role purpose in
* {@link DynamoDBMetadataStoreTableManager#getVersionMarkerItem()};
* look at that method for the details.
*/
private RetryPolicy batchWriteRetryPolicy;
@ -359,6 +341,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
*/
private ITtlTimeProvider ttlTimeProvider;
private DynamoDBMetadataStoreTableManager tableHandler;
/**
* A utility function to create DynamoDB instance.
* @param conf the file system configuration
@ -437,7 +421,11 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp)
);
this.ttlTimeProvider = ttlTp;
initTable();
tableHandler = new DynamoDBMetadataStoreTableManager(
dynamoDB, tableName, region, amazonDynamoDB, conf, readOp,
batchWriteRetryPolicy);
this.table = tableHandler.initTable();
instrumentation.initialized();
}
@ -494,6 +482,7 @@ public void initialize(Configuration config,
conf = config;
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
"No DynamoDB table name configured");
region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
@ -518,7 +507,11 @@ public void initialize(Configuration config,
"s3a-ddb-" + tableName);
initDataAccessRetries(conf);
this.ttlTimeProvider = ttlTp;
initTable();
tableHandler = new DynamoDBMetadataStoreTableManager(
dynamoDB, tableName, region, amazonDynamoDB, conf, readOp,
batchWriteRetryPolicy);
this.table = tableHandler.initTable();
}
/**
@ -1438,32 +1431,7 @@ public synchronized void close() {
@Override
@Retries.RetryTranslated
public void destroy() throws IOException {
if (table == null) {
LOG.info("In destroy(): no table to delete");
return;
}
LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
try {
invoker.retry("delete", null, true,
() -> table.delete());
table.waitForDelete();
} catch (IllegalArgumentException ex) {
throw new TableDeleteTimeoutException(tableName,
"Timeout waiting for the table " + tableArn + " to be deleted",
ex);
} catch (FileNotFoundException rnfe) {
LOG.info("FileNotFoundException while deleting DynamoDB table {} in "
+ "region {}. This may indicate that the table does not exist, "
+ "or has been deleted by another concurrent thread or process.",
tableName, region);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
tableName, ie);
throw new InterruptedIOException("Table " + tableName
+ " in region " + region + " has not been deleted");
}
tableHandler.destroy();
}
@Retries.RetryTranslated
@ -1688,29 +1656,6 @@ private void removeAuthoritativeDirFlag(
}
}
/**
* Add tags from configuration to the existing DynamoDB table.
*/
@Retries.OnceRaw
public void tagTable() {
List<Tag> tags = new ArrayList<>();
Map <String, String> tagProperties =
conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
Tag tag = new Tag().withKey(tagMapEntry.getKey())
.withValue(tagMapEntry.getValue());
tags.add(tag);
}
if (tags.isEmpty()) {
return;
}
TagResourceRequest tagResourceRequest = new TagResourceRequest()
.withResourceArn(table.getDescription().getTableArn())
.withTags(tags);
getAmazonDynamoDB().tagResource(tagResourceRequest);
}
@VisibleForTesting
public AmazonDynamoDB getAmazonDynamoDB() {
return amazonDynamoDB;
@ -1721,7 +1666,7 @@ public String toString() {
return getClass().getSimpleName() + '{'
+ "region=" + region
+ ", tableName=" + tableName
+ ", tableArn=" + tableArn
+ ", tableArn=" + tableHandler.getTableArn()
+ '}';
}
@ -1735,275 +1680,20 @@ public String toString() {
@Override
public List<RoleModel.Statement> listAWSPolicyRules(
final Set<AccessLevel> access) {
Preconditions.checkState(tableArn != null, "TableARN not known");
Preconditions.checkState(tableHandler.getTableArn() != null,
"TableARN not known");
if (access.isEmpty()) {
return Collections.emptyList();
}
RoleModel.Statement stat;
if (access.contains(AccessLevel.ADMIN)) {
stat = allowAllDynamoDBOperations(tableArn);
stat = allowAllDynamoDBOperations(tableHandler.getTableArn());
} else {
stat = allowS3GuardClientOperations(tableArn);
stat = allowS3GuardClientOperations(tableHandler.getTableArn());
}
return Lists.newArrayList(stat);
}
/**
* Create a table if it does not exist and wait for it to become active.
*
* If a table with the intended name already exists, then it uses that table.
* Otherwise, it will automatically create the table if the config
* {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
* enabled. The DynamoDB table creation API is asynchronous. This method wait
* for the table to become active after sending the creation request, so
* overall, this method is synchronous, and the table is guaranteed to exist
* after this method returns successfully.
*
* The wait for a table becoming active is Retry+Translated; it can fail
* while a table is not yet ready.
*
* @throws IOException if table does not exist and auto-creation is disabled;
* or table is being deleted, or any other I/O exception occurred.
*/
@VisibleForTesting
@Retries.OnceRaw
void initTable() throws IOException {
table = dynamoDB.getTable(tableName);
try {
try {
LOG.debug("Binding to table {}", tableName);
TableDescription description = table.describe();
LOG.debug("Table state: {}", description);
tableArn = description.getTableArn();
final String status = description.getTableStatus();
switch (status) {
case "CREATING":
LOG.debug("Table {} in region {} is being created/updated. This may"
+ " indicate that the table is being operated by another "
+ "concurrent thread or process. Waiting for active...",
tableName, region);
waitForTableActive(table);
break;
case "DELETING":
throw new FileNotFoundException("DynamoDB table "
+ "'" + tableName + "' is being "
+ "deleted in region " + region);
case "UPDATING":
// table being updated; it can still be used.
LOG.debug("Table is being updated.");
break;
case "ACTIVE":
break;
default:
throw new IOException("Unknown DynamoDB table status " + status
+ ": tableName='" + tableName + "', region=" + region);
}
final Item versionMarker = getVersionMarkerItem();
verifyVersionCompatibility(tableName, versionMarker);
Long created = extractCreationTimeFromMarker(versionMarker);
LOG.debug("Using existing DynamoDB table {} in region {} created {}",
tableName, region, (created != null) ? new Date(created) : null);
} catch (ResourceNotFoundException rnfe) {
if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
long writeCapacity = conf.getLong(
S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
ProvisionedThroughput capacity;
if (readCapacity > 0 && writeCapacity > 0) {
capacity = new ProvisionedThroughput(
readCapacity,
writeCapacity);
} else {
// at least one capacity value is <= 0
// verify they are both exactly zero
Preconditions.checkArgument(
readCapacity == 0 && writeCapacity == 0,
"S3Guard table read capacity %d and and write capacity %d"
+ " are inconsistent", readCapacity, writeCapacity);
// and set the capacity to null for per-request billing.
capacity = null;
}
createTable(capacity);
} else {
throw (FileNotFoundException)new FileNotFoundException(
"DynamoDB table '" + tableName + "' does not "
+ "exist in region " + region + "; auto-creation is turned off")
.initCause(rnfe);
}
}
} catch (AmazonClientException e) {
throw translateException("initTable", tableName, e);
}
}
/**
* Get the version mark item in the existing DynamoDB table.
*
* As the version marker item may be created by another concurrent thread or
* process, we sleep and retry a limited number times if the lookup returns
* with a null value.
* DDB throttling is always retried.
*/
@VisibleForTesting
@Retries.RetryTranslated
Item getVersionMarkerItem() throws IOException {
final PrimaryKey versionMarkerKey =
createVersionMarkerPrimaryKey(VERSION_MARKER);
int retryCount = 0;
// look for a version marker, with usual throttling/failure retries.
Item versionMarker = queryVersionMarker(versionMarkerKey);
while (versionMarker == null) {
// The marker was null.
// Two possibilities
// 1. This isn't a S3Guard table.
// 2. This is a S3Guard table in construction; another thread/process
// is about to write/actively writing the version marker.
// So that state #2 is handled, batchWriteRetryPolicy is used to manage
// retries.
// This will mean that if the cause is actually #1, failure will not
// be immediate. As this will ultimately result in a failure to
// init S3Guard and the S3A FS, this isn't going to be a performance
// bottleneck -simply a slightly slower failure report than would otherwise
// be seen.
// "if your settings are broken, performance is not your main issue"
try {
RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null,
retryCount, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
break;
} else {
LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
Thread.sleep(action.delayMillis);
}
} catch (Exception e) {
throw new IOException("initTable: Unexpected exception " + e, e);
}
retryCount++;
versionMarker = queryVersionMarker(versionMarkerKey);
}
return versionMarker;
}
/**
* Issue the query to get the version marker, with throttling for overloaded
* DDB tables.
* @param versionMarkerKey key to look up
* @return the marker
* @throws IOException failure
*/
@Retries.RetryTranslated
private Item queryVersionMarker(final PrimaryKey versionMarkerKey)
throws IOException {
return readOp.retry("getVersionMarkerItem",
VERSION_MARKER, true,
() -> table.getItem(versionMarkerKey));
}
/**
* Verify that a table version is compatible with this S3Guard client.
* @param tableName name of the table (for error messages)
* @param versionMarker the version marker retrieved from the table
* @throws IOException on any incompatibility
*/
@VisibleForTesting
static void verifyVersionCompatibility(String tableName,
Item versionMarker) throws IOException {
if (versionMarker == null) {
LOG.warn("Table {} contains no version marker", tableName);
throw new IOException(E_NO_VERSION_MARKER
+ " Table: " + tableName);
} else {
final int version = extractVersionFromMarker(versionMarker);
if (VERSION != version) {
// version mismatch. Unless/until there is support for
// upgrading versions, treat this as an incompatible change
// and fail.
throw new IOException(E_INCOMPATIBLE_VERSION
+ " Table "+ tableName
+ " Expected version " + VERSION + " actual " + version);
}
}
}
/**
* Wait for table being active.
* @param t table to block on.
* @throws IOException IO problems
* @throws InterruptedIOException if the wait was interrupted
* @throws IllegalArgumentException if an exception was raised in the waiter
*/
@Retries.RetryTranslated
private void waitForTableActive(Table t) throws IOException {
invoker.retry("Waiting for active state of table " + tableName,
null,
true,
() -> {
try {
t.waitForActive();
} catch (IllegalArgumentException ex) {
throw translateTableWaitFailure(tableName, ex);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for table {} in region {}"
+ " active",
tableName, region, e);
Thread.currentThread().interrupt();
throw (InterruptedIOException)
new InterruptedIOException("DynamoDB table '"
+ tableName + "' is not active yet in region " + region)
.initCause(e);
}
});
}
/**
* Create a table, wait for it to become active, then add the version
* marker.
* Creating an setting up the table isn't wrapped by any retry operations;
* the wait for a table to become available is RetryTranslated.
* @param capacity capacity to provision. If null: create a per-request
* table.
* @throws IOException on any failure.
* @throws InterruptedIOException if the wait was interrupted
*/
@Retries.OnceRaw
private void createTable(ProvisionedThroughput capacity) throws IOException {
try {
String mode;
CreateTableRequest request = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema())
.withAttributeDefinitions(attributeDefinitions());
if (capacity != null) {
mode = String.format("with provisioned read capacity %d and"
+ " write capacity %s",
capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits());
request.withProvisionedThroughput(capacity);
} else {
mode = "with pay-per-request billing";
request.withBillingMode(BillingMode.PAY_PER_REQUEST);
}
LOG.info("Creating non-existent DynamoDB table {} in region {} {}",
tableName, region, mode);
table = dynamoDB.createTable(request);
LOG.debug("Awaiting table becoming active");
} catch (ResourceInUseException e) {
LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+ "in region {}. This may indicate that the table was "
+ "created by another concurrent thread or process.",
tableName, region);
}
waitForTableActive(table);
final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
System.currentTimeMillis());
putItem(marker);
tagTable();
}
/**
* PUT a single item to the table.
* @param item item to put
@ -2015,47 +1705,6 @@ private PutItemOutcome putItem(Item item) {
return table.putItem(item);
}
/**
* Provision the table with given read and write capacity units.
* Call will fail if the table is busy, or the new values match the current
* ones.
* <p>
* Until the AWS SDK lets us switch a table to on-demand, an attempt to
* set the I/O capacity to zero will fail.
* @param readCapacity read units: must be greater than zero
* @param writeCapacity write units: must be greater than zero
* @throws IOException on a failure
*/
@Retries.RetryTranslated
void provisionTable(Long readCapacity, Long writeCapacity)
throws IOException {
if (readCapacity == 0 || writeCapacity == 0) {
// table is pay on demand
throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY);
}
final ProvisionedThroughput toProvision = new ProvisionedThroughput()
.withReadCapacityUnits(readCapacity)
.withWriteCapacityUnits(writeCapacity);
invoker.retry("ProvisionTable", tableName, true,
() -> {
final ProvisionedThroughputDescription p =
table.updateTable(toProvision).getProvisionedThroughput();
LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+ "writeCapacityUnits={}",
tableName, region, p.getReadCapacityUnits(),
p.getWriteCapacityUnits());
});
}
@Retries.RetryTranslated
@VisibleForTesting
void provisionTableBlocking(Long readCapacity, Long writeCapacity)
throws IOException {
provisionTable(readCapacity, writeCapacity);
waitForTableActive(table);
}
@VisibleForTesting
Table getTable() {
return table;
@ -2175,7 +1824,7 @@ public void updateParameters(Map<String, String> parameters)
currentRead, currentWrite);
LOG.info("Changing capacity of table to read: {}, write: {}",
newRead, newWrite);
provisionTableBlocking(newRead, newWrite);
tableHandler.provisionTableBlocking(newRead, newWrite);
} else {
LOG.info("Table capacity unchanged at read: {}, write: {}",
newRead, newWrite);
@ -2374,48 +2023,6 @@ String getUsername() {
return username;
}
/**
* Take an {@code IllegalArgumentException} raised by a DDB operation
* and if it contains an inner SDK exception, unwrap it.
* @param ex exception.
* @return the inner AWS exception or null.
*/
public static SdkBaseException extractInnerException(
IllegalArgumentException ex) {
if (ex.getCause() instanceof SdkBaseException) {
return (SdkBaseException) ex.getCause();
} else {
return null;
}
}
/**
* Handle a table wait failure by extracting any inner cause and
* converting it, or, if unconvertable by wrapping
* the IllegalArgumentException in an IOE.
*
* @param name name of the table
* @param e exception
* @return an IOE to raise.
*/
@VisibleForTesting
static IOException translateTableWaitFailure(
final String name, IllegalArgumentException e) {
final SdkBaseException ex = extractInnerException(e);
if (ex != null) {
if (ex instanceof WaiterTimedOutException) {
// a timeout waiting for state change: extract the
// message from the outer exception, but translate
// the inner one for the throttle policy.
return new AWSClientIOException(e.getMessage(), ex);
} else {
return translateException(e.getMessage(), name, ex);
}
} else {
return new IOException(e);
}
}
/**
* Log a PUT into the operations log at debug level.
* @param state optional ancestor state.
@ -2691,4 +2298,9 @@ private static String stateAsString(@Nullable AncestorState state) {
return stateStr;
}
}
protected DynamoDBMetadataStoreTableManager getTableHandler() {
Preconditions.checkNotNull(tableHandler, "Not initialized");
return tableHandler;
}
}

View File

@ -0,0 +1,693 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
import com.amazonaws.waiters.WaiterTimedOutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import static java.lang.String.valueOf;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_TAG_NAME;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
/**
* Managing dynamo tables for S3Guard dynamodb based metadatastore.
* Factored out from DynamoDBMetadataStore.
*/
public class DynamoDBMetadataStoreTableManager {
public static final Logger LOG = LoggerFactory.getLogger(
DynamoDBMetadataStoreTableManager.class);
/** Error: version marker not found in table but the table is not empty. */
public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY
= "S3Guard table lacks version marker, and it is not empty.";
/** Error: version mismatch. */
public static final String E_INCOMPATIBLE_TAG_VERSION
= "Database table is from an incompatible S3Guard version based on table TAG.";
/** Error: version mismatch. */
public static final String E_INCOMPATIBLE_ITEM_VERSION
= "Database table is from an incompatible S3Guard version based on table ITEM.";
/** Invoker for IO. Until configured properly, use try-once. */
private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
Invoker.NO_OP
);
final private AmazonDynamoDB amazonDynamoDB;
final private DynamoDB dynamoDB;
final private String tableName;
final private String region;
final private Configuration conf;
final private Invoker readOp;
final private RetryPolicy batchWriteRetryPolicy;
private Table table;
private String tableArn;
public DynamoDBMetadataStoreTableManager(DynamoDB dynamoDB,
String tableName,
String region,
AmazonDynamoDB amazonDynamoDB,
Configuration conf,
Invoker readOp,
RetryPolicy batchWriteCapacityExceededEvents) {
this.dynamoDB = dynamoDB;
this.amazonDynamoDB = amazonDynamoDB;
this.tableName = tableName;
this.region = region;
this.conf = conf;
this.readOp = readOp;
this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents;
}
/**
* Create a table if it does not exist and wait for it to become active.
*
* If a table with the intended name already exists, then it uses that table.
* Otherwise, it will automatically create the table if the config
* {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
* enabled. The DynamoDB table creation API is asynchronous. This method wait
* for the table to become active after sending the creation request, so
* overall, this method is synchronous, and the table is guaranteed to exist
* after this method returns successfully.
*
* The wait for a table becoming active is Retry+Translated; it can fail
* while a table is not yet ready.
*
* @throws IOException if table does not exist and auto-creation is disabled;
* or table is being deleted, or any other I/O exception occurred.
*/
@VisibleForTesting
@Retries.RetryTranslated
Table initTable() throws IOException {
table = dynamoDB.getTable(tableName);
try {
try {
LOG.debug("Binding to table {}", tableName);
TableDescription description = table.describe();
LOG.debug("Table state: {}", description);
tableArn = description.getTableArn();
final String status = description.getTableStatus();
switch (status) {
case "CREATING":
LOG.debug("Table {} in region {} is being created/updated. This may"
+ " indicate that the table is being operated by another "
+ "concurrent thread or process. Waiting for active...",
tableName, region);
waitForTableActive(table);
break;
case "DELETING":
throw new FileNotFoundException("DynamoDB table "
+ "'" + tableName + "' is being "
+ "deleted in region " + region);
case "UPDATING":
// table being updated; it can still be used.
LOG.debug("Table is being updated.");
break;
case "ACTIVE":
break;
default:
throw new IOException("Unknown DynamoDB table status " + status
+ ": tableName='" + tableName + "', region=" + region);
}
verifyVersionCompatibility();
final Item versionMarker = getVersionMarkerItem();
Long created = extractCreationTimeFromMarker(versionMarker);
LOG.debug("Using existing DynamoDB table {} in region {} created {}",
tableName, region, (created != null) ? new Date(created) : null);
} catch (ResourceNotFoundException rnfe) {
if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
long writeCapacity = conf.getLong(
S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
ProvisionedThroughput capacity;
if (readCapacity > 0 && writeCapacity > 0) {
capacity = new ProvisionedThroughput(
readCapacity,
writeCapacity);
} else {
// at least one capacity value is <= 0
// verify they are both exactly zero
Preconditions.checkArgument(
readCapacity == 0 && writeCapacity == 0,
"S3Guard table read capacity %d and and write capacity %d"
+ " are inconsistent", readCapacity, writeCapacity);
// and set the capacity to null for per-request billing.
capacity = null;
}
createTable(capacity);
} else {
throw (FileNotFoundException) new FileNotFoundException(
"DynamoDB table '" + tableName + "' does not "
+ "exist in region " + region +
"; auto-creation is turned off")
.initCause(rnfe);
}
}
} catch (AmazonClientException e) {
throw translateException("initTable", tableName, e);
}
return table;
}
protected void tagTableWithVersionMarker() {
try {
TagResourceRequest tagResourceRequest = new TagResourceRequest()
.withResourceArn(table.getDescription().getTableArn())
.withTags(newVersionMarkerTag());
amazonDynamoDB.tagResource(tagResourceRequest);
} catch (AmazonDynamoDBException e) {
LOG.warn("Exception during tagging table: {}", e.getMessage());
}
}
protected static Item getVersionMarkerFromTags(Table table,
AmazonDynamoDB addb) {
List<Tag> tags = null;
try {
final TableDescription description = table.describe();
ListTagsOfResourceRequest listTagsOfResourceRequest =
new ListTagsOfResourceRequest()
.withResourceArn(description.getTableArn());
tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags();
} catch (ResourceNotFoundException e) {
LOG.error("Table: {} not found.", table.getTableName());
throw e;
} catch (AmazonDynamoDBException e) {
LOG.warn("Exception while getting tags from the dynamo table: {}",
e.getMessage());
}
if (tags == null) {
return null;
}
final Optional<Tag> first = tags.stream()
.filter(tag -> tag.getKey().equals(VERSION_MARKER_TAG_NAME)).findFirst();
if (first.isPresent()) {
final Tag vmTag = first.get();
return createVersionMarker(
vmTag.getKey(), Integer.parseInt(vmTag.getValue()), 0
);
} else {
return null;
}
}
/**
* Create a table, wait for it to become active, then add the version
* marker.
* Creating an setting up the table isn't wrapped by any retry operations;
* the wait for a table to become available is RetryTranslated.
* The tags are added to the table during creation, not after creation.
* We can assume that tagging and creating the table is a single atomic
* operation.
*
* @param capacity capacity to provision. If null: create a per-request
* table.
* @throws IOException on any failure.
* @throws InterruptedIOException if the wait was interrupted
*/
@Retries.OnceMixed
private void createTable(ProvisionedThroughput capacity) throws IOException {
try {
String mode;
CreateTableRequest request = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema())
.withAttributeDefinitions(attributeDefinitions())
.withTags(getTableTagsFromConfig());
if (capacity != null) {
mode = String.format("with provisioned read capacity %d and"
+ " write capacity %s",
capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits());
request.withProvisionedThroughput(capacity);
} else {
mode = "with pay-per-request billing";
request.withBillingMode(BillingMode.PAY_PER_REQUEST);
}
LOG.info("Creating non-existent DynamoDB table {} in region {} {}",
tableName, region, mode);
table = dynamoDB.createTable(request);
LOG.debug("Awaiting table becoming active");
} catch (ResourceInUseException e) {
LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+ "in region {}. This may indicate that the table was "
+ "created by another concurrent thread or process.",
tableName, region);
}
waitForTableActive(table);
putVersionMarkerItemToTable();
}
/**
* Return tags from configuration and the version marker for adding to
* dynamo table during creation.
*/
@Retries.OnceRaw
public List<Tag> getTableTagsFromConfig() {
List<Tag> tags = new ArrayList<>();
// from configuration
Map<String, String> tagProperties =
conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
Tag tag = new Tag().withKey(tagMapEntry.getKey())
.withValue(tagMapEntry.getValue());
tags.add(tag);
}
// add the version marker
tags.add(newVersionMarkerTag());
return tags;
}
/**
* Create a new version marker tag.
* @return a new version marker tag
*/
private static Tag newVersionMarkerTag() {
return new Tag().withKey(VERSION_MARKER_TAG_NAME).withValue(valueOf(VERSION));
}
/**
* Verify that a table version is compatible with this S3Guard client.
*
* Checks for consistency between the version marker as the item and tag.
*
* <pre>
* 1. If the table lacks both version markers AND it's empty,
* both markers will be added.
* If the table is not empty the check throws IOException
* 2. If there's no version marker ITEM, the compatibility with the TAG
* will be checked, and the version marker ITEM will be added if the
* TAG version is compatible.
* If the TAG version is not compatible, the check throws OException
* 3. If there's no version marker TAG, the compatibility with the ITEM
* version marker will be checked, and the version marker ITEM will be
* added if the ITEM version is compatible.
* If the ITEM version is not compatible, the check throws IOException
* 4. If the TAG and ITEM versions are both present then both will be checked
* for compatibility. If the ITEM or TAG version marker is not compatible,
* the check throws IOException
* </pre>
*
* @throws IOException on any incompatibility
*/
@VisibleForTesting
protected void verifyVersionCompatibility() throws IOException {
final Item versionMarkerItem = getVersionMarkerItem();
final Item versionMarkerFromTag =
getVersionMarkerFromTags(table, amazonDynamoDB);
LOG.debug("versionMarkerItem: {}; versionMarkerFromTag: {}",
versionMarkerItem, versionMarkerFromTag);
if (versionMarkerItem == null && versionMarkerFromTag == null) {
if (!isEmptyTable(tableName, amazonDynamoDB)) {
LOG.error("Table is not empty but missing the version maker. Failing.");
throw new IOException(E_NO_VERSION_MARKER_AND_NOT_EMPTY
+ " Table: " + tableName);
}
LOG.info("Table {} contains no version marker item or tag. " +
"The table is empty, so the version marker will be added " +
"as TAG and ITEM.", tableName);
tagTableWithVersionMarker();
putVersionMarkerItemToTable();
}
if (versionMarkerItem == null && versionMarkerFromTag != null) {
final int tagVersionMarker =
extractVersionFromMarker(versionMarkerFromTag);
throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
E_INCOMPATIBLE_TAG_VERSION);
LOG.info("Table {} contains no version marker ITEM but contains " +
"compatible version marker TAG. Restoring the version marker " +
"item from tag.", tableName);
putVersionMarkerItemToTable();
}
if (versionMarkerItem != null && versionMarkerFromTag == null) {
final int itemVersionMarker =
extractVersionFromMarker(versionMarkerItem);
throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
E_INCOMPATIBLE_ITEM_VERSION);
LOG.info("Table {} contains no version marker TAG but contains " +
"compatible version marker ITEM. Restoring the version marker " +
"item from item.", tableName);
tagTableWithVersionMarker();
}
if (versionMarkerItem != null && versionMarkerFromTag != null) {
final int tagVersionMarker =
extractVersionFromMarker(versionMarkerFromTag);
final int itemVersionMarker =
extractVersionFromMarker(versionMarkerItem);
throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
E_INCOMPATIBLE_TAG_VERSION);
throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
E_INCOMPATIBLE_ITEM_VERSION);
LOG.debug("Table {} contains correct version marker TAG and ITEM.",
tableName);
}
}
private static boolean isEmptyTable(String tableName, AmazonDynamoDB aadb) {
final ScanRequest req = new ScanRequest().withTableName(
tableName).withLimit(1);
final ScanResult result = aadb.scan(req);
return result.getCount() == 0;
}
private static void throwExceptionOnVersionMismatch(int actual,
String tableName,
String exMsg) throws IOException {
if (VERSION != actual) {
throw new IOException(exMsg + " Table " + tableName
+ " Expected version: " + VERSION + " actual tag version: " +
actual);
}
}
/**
* Add version marker to the dynamo table.
*/
@Retries.OnceRaw
private void putVersionMarkerItemToTable() {
final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION,
System.currentTimeMillis());
putItem(marker);
}
/**
* Wait for table being active.
* @param t table to block on.
* @throws IOException IO problems
* @throws InterruptedIOException if the wait was interrupted
* @throws IllegalArgumentException if an exception was raised in the waiter
*/
@Retries.RetryTranslated
private void waitForTableActive(Table t) throws IOException {
invoker.retry("Waiting for active state of table " + tableName,
null,
true,
() -> {
try {
t.waitForActive();
} catch (IllegalArgumentException ex) {
throw translateTableWaitFailure(tableName, ex);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for table {} in region {}"
+ " active",
tableName, region, e);
Thread.currentThread().interrupt();
throw (InterruptedIOException)
new InterruptedIOException("DynamoDB table '"
+ tableName + "' is not active yet in region " + region)
.initCause(e);
}
});
}
/**
* Handle a table wait failure by extracting any inner cause and
* converting it, or, if unconvertable by wrapping
* the IllegalArgumentException in an IOE.
*
* @param name name of the table
* @param e exception
* @return an IOE to raise.
*/
@VisibleForTesting
static IOException translateTableWaitFailure(
final String name, IllegalArgumentException e) {
final SdkBaseException ex = extractInnerException(e);
if (ex != null) {
if (ex instanceof WaiterTimedOutException) {
// a timeout waiting for state change: extract the
// message from the outer exception, but translate
// the inner one for the throttle policy.
return new AWSClientIOException(e.getMessage(), ex);
} else {
return translateException(e.getMessage(), name, ex);
}
} else {
return new IOException(e);
}
}
/**
* Take an {@code IllegalArgumentException} raised by a DDB operation
* and if it contains an inner SDK exception, unwrap it.
* @param ex exception.
* @return the inner AWS exception or null.
*/
public static SdkBaseException extractInnerException(
IllegalArgumentException ex) {
if (ex.getCause() instanceof SdkBaseException) {
return (SdkBaseException) ex.getCause();
} else {
return null;
}
}
/**
* Get the version mark item in the existing DynamoDB table.
*
* As the version marker item may be created by another concurrent thread or
* process, we sleep and retry a limited number times if the lookup returns
* with a null value.
* DDB throttling is always retried.
*/
@VisibleForTesting
@Retries.RetryTranslated
protected Item getVersionMarkerItem() throws IOException {
final PrimaryKey versionMarkerKey =
createVersionMarkerPrimaryKey(VERSION_MARKER_ITEM_NAME);
int retryCount = 0;
// look for a version marker, with usual throttling/failure retries.
Item versionMarker = queryVersionMarker(versionMarkerKey);
while (versionMarker == null) {
// The marker was null.
// Two possibilities
// 1. This isn't a S3Guard table.
// 2. This is a S3Guard table in construction; another thread/process
// is about to write/actively writing the version marker.
// So that state #2 is handled, batchWriteRetryPolicy is used to manage
// retries.
// This will mean that if the cause is actually #1, failure will not
// be immediate. As this will ultimately result in a failure to
// init S3Guard and the S3A FS, this isn't going to be a performance
// bottleneck -simply a slightly slower failure report than would otherwise
// be seen.
// "if your settings are broken, performance is not your main issue"
try {
RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null,
retryCount, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
break;
} else {
LOG.warn("No version marker found in the DynamoDB table: {}. " +
"Sleeping {} ms before next retry", tableName, action.delayMillis);
Thread.sleep(action.delayMillis);
}
} catch (Exception e) {
throw new IOException("initTable: Unexpected exception " + e, e);
}
retryCount++;
versionMarker = queryVersionMarker(versionMarkerKey);
}
return versionMarker;
}
/**
* Issue the query to get the version marker, with throttling for overloaded
* DDB tables.
* @param versionMarkerKey key to look up
* @return the marker
* @throws IOException failure
*/
@Retries.RetryTranslated
private Item queryVersionMarker(final PrimaryKey versionMarkerKey)
throws IOException {
return readOp.retry("getVersionMarkerItem",
VERSION_MARKER_ITEM_NAME, true,
() -> table.getItem(versionMarkerKey));
}
/**
* PUT a single item to the table.
* @param item item to put
* @return the outcome.
*/
@Retries.OnceRaw
private PutItemOutcome putItem(Item item) {
LOG.debug("Putting item {}", item);
return table.putItem(item);
}
/**
* Provision the table with given read and write capacity units.
* Call will fail if the table is busy, or the new values match the current
* ones.
* <p>
* Until the AWS SDK lets us switch a table to on-demand, an attempt to
* set the I/O capacity to zero will fail.
* @param readCapacity read units: must be greater than zero
* @param writeCapacity write units: must be greater than zero
* @throws IOException on a failure
*/
@Retries.RetryTranslated
void provisionTable(Long readCapacity, Long writeCapacity)
throws IOException {
if (readCapacity == 0 || writeCapacity == 0) {
// table is pay on demand
throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY);
}
final ProvisionedThroughput toProvision = new ProvisionedThroughput()
.withReadCapacityUnits(readCapacity)
.withWriteCapacityUnits(writeCapacity);
invoker.retry("ProvisionTable", tableName, true,
() -> {
final ProvisionedThroughputDescription p =
table.updateTable(toProvision).getProvisionedThroughput();
LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+ "writeCapacityUnits={}",
tableName, region, p.getReadCapacityUnits(),
p.getWriteCapacityUnits());
});
}
@Retries.RetryTranslated
public void destroy() throws IOException {
if (table == null) {
LOG.info("In destroy(): no table to delete");
return;
}
LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
try {
invoker.retry("delete", null, true,
() -> table.delete());
table.waitForDelete();
} catch (IllegalArgumentException ex) {
throw new TableDeleteTimeoutException(tableName,
"Timeout waiting for the table " + getTableArn()
+ " to be deleted", ex);
} catch (FileNotFoundException rnfe) {
LOG.info("FileNotFoundException while deleting DynamoDB table {} in "
+ "region {}. This may indicate that the table does not exist, "
+ "or has been deleted by another concurrent thread or process.",
tableName, region);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
tableName, ie);
throw new InterruptedIOException("Table " + tableName
+ " in region " + region + " has not been deleted");
}
}
@Retries.RetryTranslated
@VisibleForTesting
void provisionTableBlocking(Long readCapacity, Long writeCapacity)
throws IOException {
provisionTable(readCapacity, writeCapacity);
waitForTableActive(table);
}
public Table getTable() {
return table;
}
public String getTableArn() {
return tableArn;
}
}

View File

@ -247,7 +247,7 @@ static int extractVersionFromMarker(Item marker) throws IOException {
* @return the creation time, or null
* @throws IOException if the item is not a version marker
*/
static Long extractCreationTimeFromMarker(Item marker) throws IOException {
static Long extractCreationTimeFromMarker(Item marker) {
if (marker.hasAttribute(TABLE_CREATED)) {
return marker.getLong(TABLE_CREATED);
} else {

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.CHILD;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION;
@ -199,8 +199,8 @@ public boolean hasNext() {
public DDBPathMetadata next() {
Item item = it.next();
Pair<String, String> key = primaryKey(item);
if (VERSION_MARKER.equals(key.getLeft()) &&
VERSION_MARKER.equals(key.getRight())) {
if (VERSION_MARKER_ITEM_NAME.equals(key.getLeft()) &&
VERSION_MARKER_ITEM_NAME.equals(key.getRight())) {
// a version marker is found, return the special type
return new VersionMarker(item);
} else {

View File

@ -951,9 +951,36 @@ logged.
### Versioning
S3Guard tables are created with a version marker, an entry with the primary
key and child entry of `../VERSION`; the use of a relative path guarantees
that it will not be resolved.
S3Guard tables are created with a version marker entry and table tag.
The entry is created with the primary key and child entry of `../VERSION`;
the use of a relative path guarantees that it will not be resolved.
Table tag key is named `s3guard_version`.
When the table is initialized by S3Guard, the table will be tagged during the
creating and the version marker entry will be created in the table.
If the table lacks the version marker entry or tag, S3Guard will try to create
it according to the following rules:
1. If the table lacks both version markers AND it's empty, both markers will be added.
If the table is not empty the check throws IOException
1. If there's no version marker ITEM, the compatibility with the TAG
will be checked, and the version marker ITEM will be added if the
TAG version is compatible.
If the TAG version is not compatible, the check throws OException
1. If there's no version marker TAG, the compatibility with the ITEM
version marker will be checked, and the version marker ITEM will be
added if the ITEM version is compatible.
If the ITEM version is not compatible, the check throws IOException
1. If the TAG and ITEM versions are both present then both will be checked
for compatibility. If the ITEM or TAG version marker is not compatible,
the check throws IOException
*Note*: If the user does not have sufficient rights to tag the table the
initialization of S3Guard will not fail, but there will be no version marker tag
on the dynamo table and the following message will be logged on WARN level:
```
Exception during tagging table: {AmazonDynamoDBException exception message}
```
*Versioning policy*

View File

@ -33,6 +33,7 @@
import java.util.Map;
import java.util.UUID;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
@ -41,6 +42,8 @@
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
import com.amazonaws.services.dynamodbv2.model.UntagResourceRequest;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
@ -70,10 +73,15 @@
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import static java.lang.String.valueOf;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_ITEM_VERSION;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_TAG_VERSION;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_NO_VERSION_MARKER_AND_NOT_EMPTY;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.getVersionMarkerFromTags;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
import static org.apache.hadoop.test.LambdaTestUtils.*;
@ -110,10 +118,11 @@ public ITestDynamoDBMetadataStore() {
LoggerFactory.getLogger(ITestDynamoDBMetadataStore.class);
public static final PrimaryKey
VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey(
DynamoDBMetadataStore.VERSION_MARKER);
DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME);
private S3AFileSystem fileSystem;
private S3AContract s3AContract;
private DynamoDBMetadataStoreTableManager tableHandler;
private URI fsUri;
@ -153,6 +162,7 @@ public void setUp() throws Exception {
try{
super.setUp();
tableHandler = getDynamoMetadataStore().getTableHandler();
} catch (FileNotFoundException e){
LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be "
+ "deleted before trying again.");
@ -613,31 +623,10 @@ public void testInitExistingTable() throws IOException {
final String tableName = ddbms.getTable().getTableName();
verifyTableInitialized(tableName, ddbms.getDynamoDB());
// create existing table
ddbms.initTable();
tableHandler.initTable();
verifyTableInitialized(tableName, ddbms.getDynamoDB());
}
/**
* Test the low level version check code.
*/
@Test
public void testItemVersionCompatibility() throws Throwable {
verifyVersionCompatibility("table",
createVersionMarker(VERSION_MARKER, VERSION, 0));
}
/**
* Test that a version marker entry without the version number field
* is rejected as incompatible with a meaningful error message.
*/
@Test
public void testItemLacksVersion() throws Throwable {
intercept(IOException.class, E_NOT_VERSION_MARKER,
() -> verifyVersionCompatibility("table",
new Item().withPrimaryKey(
createVersionMarkerPrimaryKey(VERSION_MARKER))));
}
/**
* Test versioning handling.
* <ol>
@ -663,43 +652,118 @@ public void testTableVersioning() throws Exception {
DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
try {
ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
DynamoDBMetadataStoreTableManager localTableHandler =
ddbms.getTableHandler();
Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB());
// check the tagging too
// check the tagging
verifyStoreTags(createTagMap(), ddbms);
// check version compatibility
checkVerifyVersionMarkerCompatibility(localTableHandler, table);
Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY);
table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
// create existing table
intercept(IOException.class, E_NO_VERSION_MARKER,
() -> ddbms.initTable());
// now add a different version marker
Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0);
table.putItem(v200);
// create existing table
intercept(IOException.class, E_INCOMPATIBLE_VERSION,
() -> ddbms.initTable());
// create a marker with no version and expect failure
final Item invalidMarker = new Item().withPrimaryKey(
createVersionMarkerPrimaryKey(VERSION_MARKER))
.withLong(TABLE_CREATED, 0);
table.putItem(invalidMarker);
intercept(IOException.class, E_NOT_VERSION_MARKER,
() -> ddbms.initTable());
// reinstate the version marker
table.putItem(originalVersionMarker);
ddbms.initTable();
conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries);
} finally {
destroy(ddbms);
}
}
private void checkVerifyVersionMarkerCompatibility(
DynamoDBMetadataStoreTableManager localTableHandler, Table table)
throws Exception {
final AmazonDynamoDB addb
= getDynamoMetadataStore().getAmazonDynamoDB();
Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY);
LOG.info("1/6: remove version marker and tags from table " +
"the table is empty, so it should be initialized after the call");
deleteVersionMarkerItem(table);
removeVersionMarkerTag(table, addb);
localTableHandler.initTable();
final int versionFromItem = extractVersionFromMarker(
localTableHandler.getVersionMarkerItem());
final int versionFromTag = extractVersionFromMarker(
getVersionMarkerFromTags(table, addb));
assertEquals("Table should be tagged with the right version.",
VERSION, versionFromTag);
assertEquals("Table should have the right version marker.",
VERSION, versionFromItem);
LOG.info("2/6: if the table is not empty and there's no version marker " +
"it should fail");
deleteVersionMarkerItem(table);
removeVersionMarkerTag(table, addb);
String testKey = "coffee";
Item wrongItem =
createVersionMarker(testKey, VERSION * 2, 0);
table.putItem(wrongItem);
intercept(IOException.class, E_NO_VERSION_MARKER_AND_NOT_EMPTY,
() -> localTableHandler.initTable());
LOG.info("3/6: table has only version marker item then it will be tagged");
table.putItem(originalVersionMarker);
localTableHandler.initTable();
final int versionFromTag2 = extractVersionFromMarker(
getVersionMarkerFromTags(table, addb));
assertEquals("Table should have the right version marker tag " +
"if there was a version item.", VERSION, versionFromTag2);
LOG.info("4/6: table has only version marker tag then the version marker " +
"item will be created.");
deleteVersionMarkerItem(table);
removeVersionMarkerTag(table, addb);
localTableHandler.tagTableWithVersionMarker();
localTableHandler.initTable();
final int versionFromItem2 = extractVersionFromMarker(
localTableHandler.getVersionMarkerItem());
assertEquals("Table should have the right version marker item " +
"if there was a version tag.", VERSION, versionFromItem2);
LOG.info("5/6: add a different marker tag to the table: init should fail");
deleteVersionMarkerItem(table);
removeVersionMarkerTag(table, addb);
Item v200 = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION * 2, 0);
table.putItem(v200);
intercept(IOException.class, E_INCOMPATIBLE_ITEM_VERSION,
() -> localTableHandler.initTable());
LOG.info("6/6: add a different marker item to the table: init should fail");
deleteVersionMarkerItem(table);
removeVersionMarkerTag(table, addb);
int wrongVersion = VERSION + 3;
tagTableWithCustomVersion(table, addb, wrongVersion);
intercept(IOException.class, E_INCOMPATIBLE_TAG_VERSION,
() -> localTableHandler.initTable());
// CLEANUP
table.putItem(originalVersionMarker);
localTableHandler.tagTableWithVersionMarker();
localTableHandler.initTable();
}
private void tagTableWithCustomVersion(Table table,
AmazonDynamoDB addb,
int wrongVersion) {
final Tag vmTag = new Tag().withKey(VERSION_MARKER_TAG_NAME)
.withValue(valueOf(wrongVersion));
TagResourceRequest tagResourceRequest = new TagResourceRequest()
.withResourceArn(table.getDescription().getTableArn())
.withTags(vmTag);
addb.tagResource(tagResourceRequest);
}
private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) {
addb.untagResource(new UntagResourceRequest()
.withResourceArn(table.describe().getTableArn())
.withTagKeys(VERSION_MARKER_TAG_NAME));
}
private void deleteVersionMarkerItem(Table table) {
table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
assertNull("Version marker should be null after deleting it " +
"from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY));
}
/**
* Test that initTable fails with IOException when table does not exist and
* table auto-creation is disabled.
@ -952,8 +1016,11 @@ protected void verifyStoreTags(final Map<String, String> tagMap,
tags.forEach(t -> actual.put(t.getKey(), t.getValue()));
Assertions.assertThat(actual)
.describedAs("Tags from DDB table")
.containsExactlyEntriesOf(tagMap);
assertEquals(tagMap.size(), tags.size());
.containsAllEntriesOf(tagMap);
// The version marker is always there in the tags.
// We have a plus one in tags we expect.
assertEquals(tagMap.size() + 1, tags.size());
}
protected List<Tag> listTagsOfStore(final DynamoDBMetadataStore store) {

View File

@ -91,6 +91,7 @@ public class ITestDynamoDBMetadataStoreScale
private static final long MAXIMUM_WRITE_CAPACITY = 15;
private DynamoDBMetadataStore ddbms;
private DynamoDBMetadataStoreTableManager tableHandler;
private DynamoDB ddb;
@ -160,6 +161,7 @@ public void setup() throws Exception {
super.setup();
ddbms = (DynamoDBMetadataStore) createMetadataStore();
tableName = ddbms.getTableName();
tableHandler = ddbms.getTableHandler();
assertNotNull("table has no name", tableName);
ddb = ddbms.getDynamoDB();
table = ddb.getTable(tableName);
@ -325,7 +327,7 @@ public void test_050_getVersionMarkerItem() throws Throwable {
execute("get",
OPERATIONS_PER_THREAD * 2,
expectThrottling(),
() -> ddbms.getVersionMarkerItem()
() -> tableHandler.getVersionMarkerItem()
);
}

View File

@ -160,8 +160,13 @@ public void testDynamoTableTagging() throws Exception {
List<Tag> tags = ddbms.getAmazonDynamoDB().listTagsOfResource(listTagsOfResourceRequest).getTags();
// assert
assertEquals(tagMap.size(), tags.size());
// table version is always there as a plus one tag.
assertEquals(tagMap.size() + 1, tags.size());
for (Tag tag : tags) {
// skip the version marker tag
if (tag.getKey().equals(VERSION_MARKER_TAG_NAME)) {
continue;
}
Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue());
}
// be sure to clean up - delete table

View File

@ -32,10 +32,10 @@
import org.apache.hadoop.test.HadoopTestBase;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.translateTableWaitFailure;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**

View File

@ -51,7 +51,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
import static org.mockito.Mockito.never;
@ -272,14 +272,14 @@ private static void doTestPathToKey(Path path) {
@Test
public void testVersionRoundTrip() throws Throwable {
final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0);
assertEquals("Extracted version from " + marker,
VERSION, extractVersionFromMarker(marker));
}
@Test
public void testVersionMarkerNotStatusIllegalPath() throws Throwable {
final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0);
assertNull("Path metadata fromfrom " + marker,
itemToPathMetadata(marker, "alice"));
}