HADOOP-14734 add option to tag DDB table(s) created. (Contributed by Gabor Bota and Abe Fine)
This commit is contained in:
parent
c18eb97801
commit
d32a8d5d58
@ -411,6 +411,16 @@ private Constants() {
|
||||
public static final String S3GUARD_DDB_TABLE_NAME_KEY =
|
||||
"fs.s3a.s3guard.ddb.table";
|
||||
|
||||
/**
|
||||
* A prefix for adding tags to the DDB Table upon creation.
|
||||
*
|
||||
* For example:
|
||||
* fs.s3a.s3guard.ddb.table.tag.mytag
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String S3GUARD_DDB_TABLE_TAG =
|
||||
"fs.s3a.s3guard.ddb.table.tag.";
|
||||
|
||||
/**
|
||||
* Test table name to use during DynamoDB integration test.
|
||||
*
|
||||
|
@ -61,6 +61,8 @@
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.dynamodbv2.model.TableDescription;
|
||||
import com.amazonaws.services.dynamodbv2.model.Tag;
|
||||
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -215,6 +217,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||
private static ValueMap deleteTrackingValueMap =
|
||||
new ValueMap().withBoolean(":false", false);
|
||||
|
||||
private AmazonDynamoDB amazonDynamoDB;
|
||||
private DynamoDB dynamoDB;
|
||||
private AWSCredentialProviderList credentials;
|
||||
private String region;
|
||||
@ -257,21 +260,22 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||
* @return DynamoDB instance.
|
||||
* @throws IOException I/O error.
|
||||
*/
|
||||
private static DynamoDB createDynamoDB(
|
||||
private DynamoDB createDynamoDB(
|
||||
final Configuration conf,
|
||||
final String s3Region,
|
||||
final String bucket,
|
||||
final AWSCredentialsProvider credentials)
|
||||
throws IOException {
|
||||
if (amazonDynamoDB == null) {
|
||||
Preconditions.checkNotNull(conf);
|
||||
final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
|
||||
S3GUARD_DDB_CLIENT_FACTORY_IMPL,
|
||||
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
|
||||
DynamoDBClientFactory.class);
|
||||
final Class<? extends DynamoDBClientFactory> cls =
|
||||
conf.getClass(S3GUARD_DDB_CLIENT_FACTORY_IMPL,
|
||||
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT, DynamoDBClientFactory.class);
|
||||
LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
|
||||
final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
|
||||
amazonDynamoDB = ReflectionUtils.newInstance(cls, conf)
|
||||
.createDynamoDBClient(s3Region, bucket, credentials);
|
||||
return new DynamoDB(dynamoDBClient);
|
||||
}
|
||||
return new DynamoDB(amazonDynamoDB);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -978,6 +982,34 @@ private void removeAuthoritativeDirFlag(Set<Path> pathSet)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add tags from configuration to the existing DynamoDB table.
|
||||
*/
|
||||
@Retries.OnceRaw
|
||||
public void tagTable() {
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
Map <String, String> tagProperties =
|
||||
conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
|
||||
for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
|
||||
Tag tag = new Tag().withKey(tagMapEntry.getKey())
|
||||
.withValue(tagMapEntry.getValue());
|
||||
tags.add(tag);
|
||||
}
|
||||
if (tags.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
TagResourceRequest tagResourceRequest = new TagResourceRequest()
|
||||
.withResourceArn(table.getDescription().getTableArn())
|
||||
.withTags(tags);
|
||||
getAmazonDynamoDB().tagResource(tagResourceRequest);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public AmazonDynamoDB getAmazonDynamoDB() {
|
||||
return amazonDynamoDB;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + '{'
|
||||
@ -1166,6 +1198,7 @@ private void createTable(ProvisionedThroughput capacity) throws IOException {
|
||||
final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
|
||||
System.currentTimeMillis());
|
||||
putItem(marker);
|
||||
tagTable();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,6 +118,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||
public static final String REGION_FLAG = "region";
|
||||
public static final String READ_FLAG = "read";
|
||||
public static final String WRITE_FLAG = "write";
|
||||
public static final String TAG_FLAG = "tag";
|
||||
|
||||
/**
|
||||
* Constructor a S3Guard tool with HDFS configuration.
|
||||
@ -382,6 +383,7 @@ static class Init extends S3GuardTool {
|
||||
" -" + REGION_FLAG + " REGION - Service region for connections\n" +
|
||||
" -" + READ_FLAG + " UNIT - Provisioned read throughput units\n" +
|
||||
" -" + WRITE_FLAG + " UNIT - Provisioned write through put units\n" +
|
||||
" -" + TAG_FLAG + " key=value; list of tags to tag dynamo table\n" +
|
||||
"\n" +
|
||||
" URLs for Amazon DynamoDB are of the form dynamodb://TABLE_NAME.\n" +
|
||||
" Specifying both the -" + REGION_FLAG + " option and an S3A path\n" +
|
||||
@ -393,6 +395,8 @@ static class Init extends S3GuardTool {
|
||||
getCommandFormat().addOptionWithValue(READ_FLAG);
|
||||
// write capacity.
|
||||
getCommandFormat().addOptionWithValue(WRITE_FLAG);
|
||||
// tag
|
||||
getCommandFormat().addOptionWithValue(TAG_FLAG);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -420,6 +424,23 @@ public int run(String[] args, PrintStream out) throws Exception {
|
||||
getConf().setInt(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, writeCapacity);
|
||||
}
|
||||
|
||||
String tags = getCommandFormat().getOptValue(TAG_FLAG);
|
||||
if (tags != null && !tags.isEmpty()) {
|
||||
String[] stringList = tags.split(";");
|
||||
Map<String, String> tagsKV = new HashMap<>();
|
||||
for(String kv : stringList) {
|
||||
if(kv.isEmpty() || !kv.contains("=")){
|
||||
continue;
|
||||
}
|
||||
String[] kvSplit = kv.split("=");
|
||||
tagsKV.put(kvSplit[0], kvSplit[1]);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> kv : tagsKV.entrySet()) {
|
||||
getConf().set(S3GUARD_DDB_TABLE_TAG + kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// Validate parameters.
|
||||
try {
|
||||
parseDynamoDBRegion(paths);
|
||||
|
@ -415,6 +415,13 @@ pertaining to [Provisioned Throughput](http://docs.aws.amazon.com/amazondynamodb
|
||||
[-write PROVISIONED_WRITES] [-read PROVISIONED_READS]
|
||||
```
|
||||
|
||||
Tag argument can be added with a key=value list of tags. The table for the
|
||||
metadata store will be created with these tags in DynamoDB.
|
||||
|
||||
```bash
|
||||
[-tag key=value;]
|
||||
```
|
||||
|
||||
Example 1
|
||||
|
||||
```bash
|
||||
@ -434,6 +441,14 @@ hadoop s3guard init -meta dynamodb://ireland-team -region eu-west-1
|
||||
Creates a table "ireland-team" in the region "eu-west-1.amazonaws.com"
|
||||
|
||||
|
||||
Example 3
|
||||
|
||||
```bash
|
||||
hadoop s3guard init -meta dynamodb://ireland-team -tag tag1=first;tag2=second;
|
||||
```
|
||||
|
||||
Creates a table "ireland-team" with tags "first" and "second".
|
||||
|
||||
### Import a bucket: `s3guard import`
|
||||
|
||||
```bash
|
||||
|
@ -23,16 +23,21 @@
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||
import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.dynamodbv2.model.TableDescription;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.model.Tag;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
@ -41,6 +46,7 @@
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
@ -621,6 +627,36 @@ public void testDeleteTable() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableTagging() throws IOException {
|
||||
final Configuration conf = getFileSystem().getConf();
|
||||
String tableName = "testTableTagging-" + UUID.randomUUID();
|
||||
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
|
||||
conf.set(S3GUARD_DDB_TABLE_CREATE_KEY, "true");
|
||||
|
||||
Map<String, String> tagMap = new HashMap<>();
|
||||
tagMap.put("hello", "dynamo");
|
||||
tagMap.put("tag", "youre it");
|
||||
for (Map.Entry<String, String> tagEntry : tagMap.entrySet()) {
|
||||
conf.set(S3GUARD_DDB_TABLE_TAG + tagEntry.getKey(), tagEntry.getValue());
|
||||
}
|
||||
|
||||
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
|
||||
ddbms.initialize(conf);
|
||||
assertNotNull(ddbms.getTable());
|
||||
assertEquals(tableName, ddbms.getTable().getTableName());
|
||||
ListTagsOfResourceRequest listTagsOfResourceRequest =
|
||||
new ListTagsOfResourceRequest()
|
||||
.withResourceArn(ddbms.getTable().getDescription().getTableArn());
|
||||
List<Tag> tags = ddbms.getAmazonDynamoDB()
|
||||
.listTagsOfResource(listTagsOfResourceRequest).getTags();
|
||||
assertEquals(tagMap.size(), tags.size());
|
||||
for (Tag tag : tags) {
|
||||
Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This validates the table is created and ACTIVE in DynamoDB.
|
||||
*
|
||||
|
@ -19,15 +19,21 @@
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||
import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
|
||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.dynamodbv2.model.Tag;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -39,8 +45,10 @@
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
|
||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Test S3Guard related CLI commands against DynamoDB.
|
||||
@ -92,6 +100,53 @@ public String call() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamoTableTagging() throws Exception {
|
||||
// setup
|
||||
Configuration conf = getConfiguration();
|
||||
conf.set(S3GUARD_DDB_TABLE_NAME_KEY,
|
||||
"testDynamoTableTagging-" + UUID.randomUUID());
|
||||
S3GuardTool.Init cmdR = new S3GuardTool.Init(conf);
|
||||
Map<String, String> tagMap = new HashMap<>();
|
||||
tagMap.put("hello", "dynamo");
|
||||
tagMap.put("tag", "youre it");
|
||||
|
||||
String[] argsR = new String[]{
|
||||
cmdR.getName(),
|
||||
"-tag", tagMapToStringParams(tagMap)
|
||||
};
|
||||
|
||||
// run
|
||||
cmdR.run(argsR);
|
||||
|
||||
// Check. Should create new metadatastore with the table name set.
|
||||
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
|
||||
ddbms.initialize(conf);
|
||||
ListTagsOfResourceRequest listTagsOfResourceRequest = new ListTagsOfResourceRequest()
|
||||
.withResourceArn(ddbms.getTable().getDescription().getTableArn());
|
||||
List<Tag> tags = ddbms.getAmazonDynamoDB().listTagsOfResource(listTagsOfResourceRequest).getTags();
|
||||
|
||||
// assert
|
||||
assertEquals(tagMap.size(), tags.size());
|
||||
for (Tag tag : tags) {
|
||||
Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue());
|
||||
}
|
||||
// be sure to clean up - delete table
|
||||
ddbms.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private String tagMapToStringParams(Map<String, String> tagMap) {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
|
||||
for (Map.Entry<String, String> kv : tagMap.entrySet()) {
|
||||
stringBuilder.append(kv.getKey() + "=" + kv.getValue() + ";");
|
||||
}
|
||||
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
|
||||
private static class Capacities {
|
||||
private final long read, write;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user