HADOOP-14918. Remove the Local Dynamo DB test option. Contributed by Gabor Bota.

This commit is contained in:
Sean Mackrory 2018-06-20 16:10:36 -06:00
parent d6ee4290df
commit b089a06793
11 changed files with 73 additions and 825 deletions

View File

@ -2060,10 +2060,5 @@
</profiles> </profiles>
<repositories> <repositories>
<repository>
<id>dynamodb-local-oregon</id>
<name>DynamoDB Local Release Repository</name>
<url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url>
</repository>
</repositories> </repositories>
</project> </project>

View File

@ -36,7 +36,6 @@
<downloadSources>true</downloadSources> <downloadSources>true</downloadSources>
<hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir> <hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>
<dynamodb.local.version>1.11.86</dynamodb.local.version>
<!-- are scale tests enabled ? --> <!-- are scale tests enabled ? -->
<fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled> <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
<!-- Size in MB of huge files. --> <!-- Size in MB of huge files. -->
@ -49,6 +48,8 @@
<fs.s3a.s3guard.test.enabled>false</fs.s3a.s3guard.test.enabled> <fs.s3a.s3guard.test.enabled>false</fs.s3a.s3guard.test.enabled>
<fs.s3a.s3guard.test.authoritative>false</fs.s3a.s3guard.test.authoritative> <fs.s3a.s3guard.test.authoritative>false</fs.s3a.s3guard.test.authoritative>
<fs.s3a.s3guard.test.implementation>local</fs.s3a.s3guard.test.implementation> <fs.s3a.s3guard.test.implementation>local</fs.s3a.s3guard.test.implementation>
<!-- Set a longer timeout for integration test (in milliseconds) -->
<test.integration.timeout>200000</test.integration.timeout>
</properties> </properties>
@ -162,6 +163,7 @@
<fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative> <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
<fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation> <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
<test.default.timeout>${test.integration.timeout}</test.default.timeout>
</systemPropertyVariables> </systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover --> <!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation --> <!-- access to the root directory must run in isolation -->
@ -299,23 +301,10 @@
</properties> </properties>
</profile> </profile>
<!-- Switch to DynamoDBLocal for S3Guard. Has no effect unless S3Guard is enabled -->
<profile>
<id>dynamodblocal</id>
<activation>
<property>
<name>dynamodblocal</name>
</property>
</activation>
<properties>
<fs.s3a.s3guard.test.implementation>dynamodblocal</fs.s3a.s3guard.test.implementation>
</properties>
</profile>
<!-- Switch S3Guard from Authoritative=false to true <!-- Switch S3Guard from Authoritative=false to true
Has no effect unless S3Guard is enabled --> Has no effect unless S3Guard is enabled -->
<profile> <profile>
<id>non-auth</id> <id>auth</id>
<activation> <activation>
<property> <property>
<name>auth</name> <name>auth</name>
@ -346,6 +335,9 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
<test.default.timeout>${test.integration.timeout}</test.default.timeout>
</systemPropertyVariables>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
@ -417,26 +409,6 @@
<artifactId>aws-java-sdk-bundle</artifactId> <artifactId>aws-java-sdk-bundle</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>DynamoDBLocal</artifactId>
<version>${dynamodb.local.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -401,6 +401,17 @@ private Constants() {
public static final String S3GUARD_DDB_TABLE_NAME_KEY = public static final String S3GUARD_DDB_TABLE_NAME_KEY =
"fs.s3a.s3guard.ddb.table"; "fs.s3a.s3guard.ddb.table";
/**
* Test table name to use during DynamoDB integration test.
*
* The table will be modified, and deleted in the end of the tests.
* If this value is not set, the integration tests that would be destructive
* won't run.
*/
@InterfaceStability.Unstable
public static final String S3GUARD_DDB_TEST_TABLE_NAME_KEY =
"fs.s3a.s3guard.ddb.test.table";
/** /**
* Whether to create the DynamoDB table if the table does not exist. * Whether to create the DynamoDB table if the table does not exist.
*/ */

View File

@ -261,6 +261,7 @@ private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
@Override @Override
@Retries.OnceRaw @Retries.OnceRaw
public void initialize(FileSystem fs) throws IOException { public void initialize(FileSystem fs) throws IOException {
Preconditions.checkNotNull(fs, "Null filesystem");
Preconditions.checkArgument(fs instanceof S3AFileSystem, Preconditions.checkArgument(fs instanceof S3AFileSystem,
"DynamoDBMetadataStore only supports S3A filesystem."); "DynamoDBMetadataStore only supports S3A filesystem.");
owner = (S3AFileSystem) fs; owner = (S3AFileSystem) fs;

View File

@ -29,13 +29,10 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
/** /**
* An extension of the contract test base set up for S3A tests. * An extension of the contract test base set up for S3A tests.
@ -78,23 +75,7 @@ protected int getTestTimeoutMillis() {
*/ */
@Override @Override
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration(); return S3ATestUtils.prepareTestConfiguration(super.createConfiguration());
// patch in S3Guard options
maybeEnableS3Guard(conf);
// set hadoop temp dir to a default value
String testUniqueForkId =
System.getProperty(TEST_UNIQUE_FORK_ID);
String tmpDir = conf.get(Constants.HADOOP_TMP_DIR, "target/build/test");
if (testUniqueForkId != null) {
// patch temp dir for the specific branch
tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
conf.set(Constants.HADOOP_TMP_DIR, tmpDir);
}
conf.set(Constants.BUFFER_DIR, tmpDir);
// add this so that even on tests where the FS is shared,
// the FS is always "magic"
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
} }
protected Configuration getConfiguration() { protected Configuration getConfiguration() {

View File

@ -143,7 +143,6 @@ public interface S3ATestConstants {
String TEST_S3GUARD_IMPLEMENTATION = TEST_S3GUARD_PREFIX + ".implementation"; String TEST_S3GUARD_IMPLEMENTATION = TEST_S3GUARD_PREFIX + ".implementation";
String TEST_S3GUARD_IMPLEMENTATION_LOCAL = "local"; String TEST_S3GUARD_IMPLEMENTATION_LOCAL = "local";
String TEST_S3GUARD_IMPLEMENTATION_DYNAMO = "dynamo"; String TEST_S3GUARD_IMPLEMENTATION_DYNAMO = "dynamo";
String TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL = "dynamodblocal";
String TEST_S3GUARD_IMPLEMENTATION_NONE = "none"; String TEST_S3GUARD_IMPLEMENTATION_NONE = "none";
/** /**

View File

@ -30,9 +30,6 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory;
import org.apache.hadoop.fs.s3a.s3guard.DynamoDBLocalClientFactory;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.hamcrest.core.Is; import org.hamcrest.core.Is;
import org.junit.Assert; import org.junit.Assert;
@ -42,6 +39,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -56,6 +54,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -393,9 +392,6 @@ public static void maybeEnableS3Guard(Configuration conf) {
case TEST_S3GUARD_IMPLEMENTATION_LOCAL: case TEST_S3GUARD_IMPLEMENTATION_LOCAL:
implClass = S3GUARD_METASTORE_LOCAL; implClass = S3GUARD_METASTORE_LOCAL;
break; break;
case TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL:
conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
DynamoDBLocalClientFactory.class, DynamoDBClientFactory.class);
case TEST_S3GUARD_IMPLEMENTATION_DYNAMO: case TEST_S3GUARD_IMPLEMENTATION_DYNAMO:
implClass = S3GUARD_METASTORE_DYNAMO; implClass = S3GUARD_METASTORE_DYNAMO;
break; break;
@ -489,6 +485,32 @@ public static <E extends Throwable, T extends Closeable> E interceptClosing(
}); });
} }
/**
* Patch a configuration for testing.
* This includes possibly enabling s3guard, setting up the local
* FS temp dir and anything else needed for test runs.
* @param conf configuration to patch
* @return the now-patched configuration
*/
public static Configuration prepareTestConfiguration(final Configuration conf) {
// patch in S3Guard options
maybeEnableS3Guard(conf);
// set hadoop temp dir to a default value
String testUniqueForkId =
System.getProperty(TEST_UNIQUE_FORK_ID);
String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test");
if (testUniqueForkId != null) {
// patch temp dir for the specific branch
tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
conf.set(HADOOP_TMP_DIR, tmpDir);
}
conf.set(BUFFER_DIR, tmpDir);
// add this so that even on tests where the FS is shared,
// the FS is always "magic"
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
}
/** /**
* Helper class to do diffs of metrics. * Helper class to do diffs of metrics.
*/ */

View File

@ -49,6 +49,7 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -514,6 +515,21 @@ public boolean isRecover() {
} }
} }
/**
* InvocationOnMock.getArgumentAt comes and goes with Mockito versions; this
* helper method is designed to be resilient to change.
* @param invocation invocation to query
* @param index argument index
* @param clazz class of return type
* @param <T> type of return
* @return the argument of the invocation, cast to the given type.
*/
@SuppressWarnings("unchecked")
private static<T> T getArgumentAt(InvocationOnMock invocation, int index,
Class<T> clazz) {
return (T)invocation.getArguments()[index];
}
/** /**
* Instantiate mock client with the results and errors requested. * Instantiate mock client with the results and errors requested.
* @param results results to accrue * @param results results to accrue
@ -539,7 +555,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
"Mock Fail on init " + results.requests.size()); "Mock Fail on init " + results.requests.size());
} }
String uploadId = UUID.randomUUID().toString(); String uploadId = UUID.randomUUID().toString();
InitiateMultipartUploadRequest req = invocation.getArgumentAt( InitiateMultipartUploadRequest req = getArgumentAt(invocation,
0, InitiateMultipartUploadRequest.class); 0, InitiateMultipartUploadRequest.class);
results.requests.put(uploadId, req); results.requests.put(uploadId, req);
results.activeUploads.put(uploadId, req.getKey()); results.activeUploads.put(uploadId, req.getKey());
@ -561,7 +577,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
throw new AmazonClientException( throw new AmazonClientException(
"Mock Fail on upload " + results.parts.size()); "Mock Fail on upload " + results.parts.size());
} }
UploadPartRequest req = invocation.getArgumentAt( UploadPartRequest req = getArgumentAt(invocation,
0, UploadPartRequest.class); 0, UploadPartRequest.class);
results.parts.add(req); results.parts.add(req);
String etag = UUID.randomUUID().toString(); String etag = UUID.randomUUID().toString();
@ -588,7 +604,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
throw new AmazonClientException( throw new AmazonClientException(
"Mock Fail on commit " + results.commits.size()); "Mock Fail on commit " + results.commits.size());
} }
CompleteMultipartUploadRequest req = invocation.getArgumentAt( CompleteMultipartUploadRequest req = getArgumentAt(invocation,
0, CompleteMultipartUploadRequest.class); 0, CompleteMultipartUploadRequest.class);
results.commits.add(req); results.commits.add(req);
results.activeUploads.remove(req.getUploadId()); results.activeUploads.remove(req.getUploadId());
@ -608,7 +624,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
throw new AmazonClientException( throw new AmazonClientException(
"Mock Fail on abort " + results.aborts.size()); "Mock Fail on abort " + results.aborts.size());
} }
AbortMultipartUploadRequest req = invocation.getArgumentAt( AbortMultipartUploadRequest req = getArgumentAt(invocation,
0, AbortMultipartUploadRequest.class); 0, AbortMultipartUploadRequest.class);
String id = req.getUploadId(); String id = req.getUploadId();
String p = results.activeUploads.remove(id); String p = results.activeUploads.remove(id);
@ -630,7 +646,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
doAnswer(invocation -> { doAnswer(invocation -> {
LOG.debug("deleteObject for {}", mockClient); LOG.debug("deleteObject for {}", mockClient);
synchronized (lock) { synchronized (lock) {
results.deletes.add(invocation.getArgumentAt( results.deletes.add(getArgumentAt(invocation,
0, DeleteObjectRequest.class)); 0, DeleteObjectRequest.class));
return null; return null;
} }
@ -643,8 +659,8 @@ public static AmazonS3 newMockS3Client(final ClientResults results,
LOG.debug("deleteObject for {}", mockClient); LOG.debug("deleteObject for {}", mockClient);
synchronized (lock) { synchronized (lock) {
results.deletes.add(new DeleteObjectRequest( results.deletes.add(new DeleteObjectRequest(
invocation.getArgumentAt(0, String.class), getArgumentAt(invocation, 0, String.class),
invocation.getArgumentAt(1, String.class) getArgumentAt(invocation, 1, String.class)
)); ));
return null; return null;
} }

View File

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.File;
import java.io.IOException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
import org.apache.hadoop.net.ServerSocketUtil;
import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory.DefaultDynamoDBClientFactory.getRegion;
/**
* A DynamoDBClientFactory implementation that creates AmazonDynamoDB clients
* against an in-memory DynamoDBLocal server instance.
*
* You won't be charged bills for issuing any DynamoDB requests. However, the
* DynamoDBLocal is considered a simulator of the DynamoDB web service, so it
* may be stale or different. For example, the throttling is not yet supported
* in DynamoDBLocal. This is for testing purpose only.
*
* To use this for creating DynamoDB client in tests:
* <ol>
* <li>
* As all DynamoDBClientFactory implementations, this should be configured.
* </li>
* <li>
* The singleton DynamoDBLocal server instance is started automatically when
* creating the AmazonDynamoDB client for the first time. It still merits to
* launch the server before all the tests and fail fast if error happens.
* </li>
* <li>
* The server can be stopped explicitly, which is not actually needed in
* tests as JVM termination will do that.
* </li>
* </ol>
*
* @see DefaultDynamoDBClientFactory
*/
public class DynamoDBLocalClientFactory extends Configured
implements DynamoDBClientFactory {
/** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
private static DynamoDBProxyServer dynamoDBLocalServer;
private static String ddbEndpoint;
private static final String SYSPROP_SQLITE_LIB = "sqlite4java.library.path";
@Override
public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
throws IOException {
startSingletonServer();
final Configuration conf = getConf();
// use the default credential provider chain
conf.unset(AWS_CREDENTIALS_PROVIDER);
final AWSCredentialsProvider credentials =
createAWSCredentialProviderSet(null, conf);
final ClientConfiguration awsConf =
DefaultS3ClientFactory.createAwsConf(conf);
// fail fast in case of service errors
awsConf.setMaxErrorRetry(3);
final String region = getRegion(conf, defaultRegion);
LOG.info("Creating DynamoDBLocal client using endpoint {} in region {}",
ddbEndpoint, region);
return AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentials)
.withClientConfiguration(awsConf)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(ddbEndpoint, region))
.build();
}
/**
* Start a singleton in-memory DynamoDBLocal server if not started yet.
* @throws IOException if any error occurs
*/
public synchronized static void startSingletonServer() throws IOException {
if (dynamoDBLocalServer != null) {
return;
}
// Set this property if it has not been set elsewhere
if (StringUtils.isEmpty(System.getProperty(SYSPROP_SQLITE_LIB))) {
String projectBuildDir = System.getProperty("project.build.directory");
if (StringUtils.isEmpty(projectBuildDir)) {
projectBuildDir = "target";
}
// sqlite4java lib should have been copied to $projectBuildDir/native-libs
System.setProperty(SYSPROP_SQLITE_LIB,
projectBuildDir + File.separator + "native-libs");
LOG.info("Setting {} -> {}",
SYSPROP_SQLITE_LIB, System.getProperty(SYSPROP_SQLITE_LIB));
}
try {
// Start an in-memory local DynamoDB instance
final String port = String.valueOf(ServerSocketUtil.getPort(0, 100));
ddbEndpoint = "http://localhost:" + port;
dynamoDBLocalServer = ServerRunner.createServerFromCommandLineArgs(
new String[]{"-inMemory", "-port", port});
dynamoDBLocalServer.start();
LOG.info("DynamoDBLocal singleton server was started at {}", ddbEndpoint);
} catch (Exception t) {
String msg = "Error starting DynamoDBLocal server at " + ddbEndpoint
+ " " + t;
LOG.error(msg, t);
throw new IOException(msg, t);
}
}
/**
* Stop the in-memory DynamoDBLocal server if it is started.
* @throws IOException if any error occurs
*/
public synchronized static void stopSingletonServer() throws IOException {
if (dynamoDBLocalServer != null) {
LOG.info("Shutting down the in-memory DynamoDBLocal server");
try {
dynamoDBLocalServer.stop();
} catch (Throwable t) {
String msg = "Error stopping DynamoDBLocal server at " + ddbEndpoint;
LOG.error(msg, t);
throw new IOException(msg, t);
}
}
}
}

View File

@ -28,7 +28,6 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -43,6 +42,7 @@
import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Tristate; import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.HadoopTestBase;
/** /**
* Main test class for MetadataStore implementations. * Main test class for MetadataStore implementations.
@ -51,7 +51,7 @@
* If your implementation may return missing results for recently set paths, * If your implementation may return missing results for recently set paths,
* override {@link MetadataStoreTestBase#allowMissing()}. * override {@link MetadataStoreTestBase#allowMissing()}.
*/ */
public abstract class MetadataStoreTestBase extends Assert { public abstract class MetadataStoreTestBase extends HadoopTestBase {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MetadataStoreTestBase.class); LoggerFactory.getLogger(MetadataStoreTestBase.class);

View File

@ -1,589 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.s3a.Tristate;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
import static org.apache.hadoop.test.LambdaTestUtils.*;
/**
* Test that {@link DynamoDBMetadataStore} implements {@link MetadataStore}.
*
* In this unit test, we use an in-memory DynamoDBLocal server instead of real
* AWS DynamoDB. An {@link S3AFileSystem} object is created and shared for
* initializing {@link DynamoDBMetadataStore} objects. There are no real S3
* request issued as the underlying AWS S3Client is mocked. You won't be
* charged bills for AWS S3 or DynamoDB when you run this test.
*
* According to the base class, every test case will have independent contract
* to create a new {@link DynamoDBMetadataStore} instance and initializes it.
* A table will be created for each test by the test contract, and will be
* destroyed after the test case finishes.
*/
public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestDynamoDBMetadataStore.class);
private static final String BUCKET = "TestDynamoDBMetadataStore";
private static final String S3URI =
URI.create(FS_S3A + "://" + BUCKET + "/").toString();
public static final PrimaryKey
VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey(
DynamoDBMetadataStore.VERSION_MARKER);
/** The DynamoDB instance that can issue requests directly to server. */
private static DynamoDB dynamoDB;
@Rule
public final Timeout timeout = new Timeout(60 * 1000);
/**
* Start the in-memory DynamoDBLocal server and initializes s3 file system.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
DynamoDBLocalClientFactory.startSingletonServer();
try {
dynamoDB = new DynamoDBMSContract().getMetadataStore().getDynamoDB();
} catch (AmazonServiceException e) {
final String msg = "Cannot initialize a DynamoDBMetadataStore instance "
+ "against the local DynamoDB server. Perhaps the DynamoDBLocal "
+ "server is not configured correctly. ";
LOG.error(msg, e);
// fail fast if the DynamoDBLocal server can not work
throw e;
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (dynamoDB != null) {
dynamoDB.shutdown();
}
DynamoDBLocalClientFactory.stopSingletonServer();
}
/**
* Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
*/
private static class DynamoDBMSContract extends AbstractMSContract {
private final S3AFileSystem s3afs;
private final DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
DynamoDBMSContract() throws IOException {
this(new Configuration());
}
DynamoDBMSContract(Configuration conf) throws IOException {
// using mocked S3 clients
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, S3URI);
// setting config for creating a DynamoDBClient against local server
conf.set(ACCESS_KEY, "dummy-access-key");
conf.set(SECRET_KEY, "dummy-secret-key");
conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
DynamoDBLocalClientFactory.class, DynamoDBClientFactory.class);
// always create new file system object for a test contract
s3afs = (S3AFileSystem) FileSystem.newInstance(conf);
ms.initialize(s3afs);
}
@Override
public S3AFileSystem getFileSystem() {
return s3afs;
}
@Override
public DynamoDBMetadataStore getMetadataStore() {
return ms;
}
}
@Override
public DynamoDBMSContract createContract() throws IOException {
return new DynamoDBMSContract();
}
@Override
public DynamoDBMSContract createContract(Configuration conf) throws
IOException {
return new DynamoDBMSContract(conf);
}
@Override
FileStatus basicFileStatus(Path path, int size, boolean isDir)
throws IOException {
String owner = UserGroupInformation.getCurrentUser().getShortUserName();
return isDir
? new S3AFileStatus(true, path, owner)
: new S3AFileStatus(size, getModTime(), path, BLOCK_SIZE, owner);
}
private DynamoDBMetadataStore getDynamoMetadataStore() throws IOException {
return (DynamoDBMetadataStore) getContract().getMetadataStore();
}
private S3AFileSystem getFileSystem() throws IOException {
return (S3AFileSystem) getContract().getFileSystem();
}
/**
* This tests that after initialize() using an S3AFileSystem object, the
* instance should have been initialized successfully, and tables are ACTIVE.
*/
@Test
public void testInitialize() throws IOException {
final String tableName = "testInitializeWithFileSystem";
final S3AFileSystem s3afs = getFileSystem();
final Configuration conf = s3afs.getConf();
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(s3afs);
verifyTableInitialized(tableName);
assertNotNull(ddbms.getTable());
assertEquals(tableName, ddbms.getTable().getTableName());
String expectedRegion = conf.get(S3GUARD_DDB_REGION_KEY,
s3afs.getBucketLocation(tableName));
assertEquals("DynamoDB table should be in configured region or the same" +
" region as S3 bucket",
expectedRegion,
ddbms.getRegion());
}
}
/**
* This tests that after initialize() using a Configuration object, the
* instance should have been initialized successfully, and tables are ACTIVE.
*/
@Test
public void testInitializeWithConfiguration() throws IOException {
final String tableName = "testInitializeWithConfiguration";
final Configuration conf = getFileSystem().getConf();
conf.unset(S3GUARD_DDB_TABLE_NAME_KEY);
String savedRegion = conf.get(S3GUARD_DDB_REGION_KEY,
getFileSystem().getBucketLocation());
conf.unset(S3GUARD_DDB_REGION_KEY);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(conf);
fail("Should have failed because the table name is not set!");
} catch (IllegalArgumentException ignored) {
}
// config table name
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(conf);
fail("Should have failed because as the region is not set!");
} catch (IllegalArgumentException ignored) {
}
// config region
conf.set(S3GUARD_DDB_REGION_KEY, savedRegion);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(conf);
verifyTableInitialized(tableName);
assertNotNull(ddbms.getTable());
assertEquals(tableName, ddbms.getTable().getTableName());
assertEquals("Unexpected key schema found!",
keySchema(),
ddbms.getTable().describe().getKeySchema());
}
}
/**
* Test that for a large batch write request, the limit is handled correctly.
*/
@Test
public void testBatchWrite() throws IOException {
final int[] numMetasToDeleteOrPut = {
-1, // null
0, // empty collection
1, // one path
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT, // exact limit of a batch request
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT + 1 // limit + 1
};
for (int numOldMetas : numMetasToDeleteOrPut) {
for (int numNewMetas : numMetasToDeleteOrPut) {
doTestBatchWrite(numOldMetas, numNewMetas);
}
}
}
private void doTestBatchWrite(int numDelete, int numPut) throws IOException {
final String root = S3URI + "/testBatchWrite_" + numDelete + '_' + numPut;
final Path oldDir = new Path(root, "oldDir");
final Path newDir = new Path(root, "newDir");
LOG.info("doTestBatchWrite: oldDir={}, newDir={}", oldDir, newDir);
DynamoDBMetadataStore ms = getDynamoMetadataStore();
ms.put(new PathMetadata(basicFileStatus(oldDir, 0, true)));
ms.put(new PathMetadata(basicFileStatus(newDir, 0, true)));
final List<PathMetadata> oldMetas =
numDelete < 0 ? null : new ArrayList<PathMetadata>(numDelete);
for (int i = 0; i < numDelete; i++) {
oldMetas.add(new PathMetadata(
basicFileStatus(new Path(oldDir, "child" + i), i, true)));
}
final List<PathMetadata> newMetas =
numPut < 0 ? null : new ArrayList<PathMetadata>(numPut);
for (int i = 0; i < numPut; i++) {
newMetas.add(new PathMetadata(
basicFileStatus(new Path(newDir, "child" + i), i, false)));
}
Collection<Path> pathsToDelete = null;
if (oldMetas != null) {
// put all metadata of old paths and verify
ms.put(new DirListingMetadata(oldDir, oldMetas, false));
assertEquals(0, ms.listChildren(newDir).withoutTombstones().numEntries());
assertTrue(CollectionUtils.isEqualCollection(oldMetas,
ms.listChildren(oldDir).getListing()));
pathsToDelete = new ArrayList<>(oldMetas.size());
for (PathMetadata meta : oldMetas) {
pathsToDelete.add(meta.getFileStatus().getPath());
}
}
// move the old paths to new paths and verify
ms.move(pathsToDelete, newMetas);
assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
if (newMetas != null) {
assertTrue(CollectionUtils.isEqualCollection(newMetas,
ms.listChildren(newDir).getListing()));
}
}
@Test
public void testInitExistingTable() throws IOException {
final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
final String tableName = ddbms.getTable().getTableName();
verifyTableInitialized(tableName);
// create existing table
ddbms.initTable();
verifyTableInitialized(tableName);
}
/**
* Test the low level version check code.
*/
@Test
public void testItemVersionCompatibility() throws Throwable {
verifyVersionCompatibility("table",
createVersionMarker(VERSION_MARKER, VERSION, 0));
}
/**
* Test that a version marker entry without the version number field
* is rejected as incompatible with a meaningful error message.
*/
@Test
public void testItemLacksVersion() throws Throwable {
intercept(IOException.class, E_NOT_VERSION_MARKER,
new VoidCallable() {
@Override
public void call() throws Exception {
verifyVersionCompatibility("table",
new Item().withPrimaryKey(
createVersionMarkerPrimaryKey(VERSION_MARKER)));
}
});
}
/**
* Delete the version marker and verify that table init fails.
*/
@Test
public void testTableVersionRequired() throws Exception {
Configuration conf = getFileSystem().getConf();
int maxRetries = conf.getInt(S3GUARD_DDB_MAX_RETRIES,
S3GUARD_DDB_MAX_RETRIES_DEFAULT);
conf.setInt(S3GUARD_DDB_MAX_RETRIES, 3);
final DynamoDBMetadataStore ddbms = createContract(conf).getMetadataStore();
String tableName = conf.get(S3GUARD_DDB_TABLE_NAME_KEY, BUCKET);
Table table = verifyTableInitialized(tableName);
table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
// create existing table
intercept(IOException.class, E_NO_VERSION_MARKER,
new VoidCallable() {
@Override
public void call() throws Exception {
ddbms.initTable();
}
});
conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries);
}
/**
* Set the version value to a different number and verify that
* table init fails.
*/
@Test
public void testTableVersionMismatch() throws Exception {
final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
String tableName = getFileSystem().getConf()
.get(S3GUARD_DDB_TABLE_NAME_KEY, BUCKET);
Table table = verifyTableInitialized(tableName);
table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
Item v200 = createVersionMarker(VERSION_MARKER, 200, 0);
table.putItem(v200);
// create existing table
intercept(IOException.class, E_INCOMPATIBLE_VERSION,
new VoidCallable() {
@Override
public void call() throws Exception {
ddbms.initTable();
}
});
}
/**
* Test that initTable fails with IOException when table does not exist and
* table auto-creation is disabled.
*/
@Test
public void testFailNonexistentTable() throws IOException {
final String tableName = "testFailNonexistentTable";
final S3AFileSystem s3afs = getFileSystem();
final Configuration conf = s3afs.getConf();
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
conf.unset(S3GUARD_DDB_TABLE_CREATE_KEY);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(s3afs);
fail("Should have failed as table does not exist and table auto-creation"
+ " is disabled");
} catch (IOException ignored) {
}
}
/**
* Test cases about root directory as it is not in the DynamoDB table.
*/
@Test
public void testRootDirectory() throws IOException {
final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
Path rootPath = new Path(S3URI);
verifyRootDirectory(ddbms.get(rootPath), true);
ddbms.put(new PathMetadata(new S3AFileStatus(true,
new Path(rootPath, "foo"),
UserGroupInformation.getCurrentUser().getShortUserName())));
verifyRootDirectory(ddbms.get(new Path(S3URI)), false);
}
private void verifyRootDirectory(PathMetadata rootMeta, boolean isEmpty) {
assertNotNull(rootMeta);
final FileStatus status = rootMeta.getFileStatus();
assertNotNull(status);
assertTrue(status.isDirectory());
// UNKNOWN is always a valid option, but true / false should not contradict
if (isEmpty) {
assertNotSame("Should not be marked non-empty",
Tristate.FALSE,
rootMeta.isEmptyDirectory());
} else {
assertNotSame("Should not be marked empty",
Tristate.TRUE,
rootMeta.isEmptyDirectory());
}
}
/**
* Test that when moving nested paths, all its ancestors up to destination
* root will also be created.
* Here is the directory tree before move:
* <pre>
* testMovePopulateAncestors
* a
*    b
*    src
*    dir1
*       dir2
*    file1.txt
* c
* d
* dest
*</pre>
* As part of rename(a/b/src, d/c/dest), S3A will enumerate the subtree at
* a/b/src. This test verifies that after the move, the new subtree at
* 'dest' is reachable from the root (i.e. c/ and c/d exist in the table.
* DynamoDBMetadataStore depends on this property to do recursive delete
* without a full table scan.
*/
@Test
public void testMovePopulatesAncestors() throws IOException {
final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
final String testRoot = "/testMovePopulatesAncestors";
final String srcRoot = testRoot + "/a/b/src";
final String destRoot = testRoot + "/c/d/e/dest";
final Path nestedPath1 = strToPath(srcRoot + "/file1.txt");
ddbms.put(new PathMetadata(basicFileStatus(nestedPath1, 1024, false)));
final Path nestedPath2 = strToPath(srcRoot + "/dir1/dir2");
ddbms.put(new PathMetadata(basicFileStatus(nestedPath2, 0, true)));
// We don't put the destRoot path here, since put() would create ancestor
// entries, and we want to ensure that move() does it, instead.
// Build enumeration of src / dest paths and do the move()
final Collection<Path> fullSourcePaths = Lists.newArrayList(
strToPath(srcRoot),
strToPath(srcRoot + "/dir1"),
strToPath(srcRoot + "/dir1/dir2"),
strToPath(srcRoot + "/file1.txt")
);
final Collection<PathMetadata> pathsToCreate = Lists.newArrayList(
new PathMetadata(basicFileStatus(strToPath(destRoot),
0, true)),
new PathMetadata(basicFileStatus(strToPath(destRoot + "/dir1"),
0, true)),
new PathMetadata(basicFileStatus(strToPath(destRoot + "/dir1/dir2"),
0, true)),
new PathMetadata(basicFileStatus(strToPath(destRoot + "/file1.txt"),
1024, false))
);
ddbms.move(fullSourcePaths, pathsToCreate);
// assert that all the ancestors should have been populated automatically
assertCached(testRoot + "/c");
assertCached(testRoot + "/c/d");
assertCached(testRoot + "/c/d/e");
assertCached(destRoot /* /c/d/e/dest */);
// Also check moved files while we're at it
assertCached(destRoot + "/dir1");
assertCached(destRoot + "/dir1/dir2");
assertCached(destRoot + "/file1.txt");
}
@Test
public void testProvisionTable() throws IOException {
final DynamoDBMetadataStore ddbms = getDynamoMetadataStore();
final String tableName = ddbms.getTable().getTableName();
final ProvisionedThroughputDescription oldProvision =
dynamoDB.getTable(tableName).describe().getProvisionedThroughput();
ddbms.provisionTable(oldProvision.getReadCapacityUnits() * 2,
oldProvision.getWriteCapacityUnits() * 2);
final ProvisionedThroughputDescription newProvision =
dynamoDB.getTable(tableName).describe().getProvisionedThroughput();
LOG.info("Old provision = {}, new provision = {}",
oldProvision, newProvision);
assertEquals(oldProvision.getReadCapacityUnits() * 2,
newProvision.getReadCapacityUnits().longValue());
assertEquals(oldProvision.getWriteCapacityUnits() * 2,
newProvision.getWriteCapacityUnits().longValue());
}
@Test
public void testDeleteTable() throws Exception {
final String tableName = "testDeleteTable";
final S3AFileSystem s3afs = getFileSystem();
final Configuration conf = s3afs.getConf();
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
ddbms.initialize(s3afs);
// we can list the empty table
ddbms.listChildren(new Path(S3URI));
ddbms.destroy();
verifyTableNotExist(tableName);
// delete table once more; be ResourceNotFoundException swallowed silently
ddbms.destroy();
verifyTableNotExist(tableName);
try {
// we can no longer list the destroyed table
ddbms.listChildren(new Path(S3URI));
fail("Should have failed after the table is destroyed!");
} catch (IOException ignored) {
}
}
}
/**
* This validates the table is created and ACTIVE in DynamoDB.
*
* This should not rely on the {@link DynamoDBMetadataStore} implementation.
* Return the table
*/
private static Table verifyTableInitialized(String tableName) {
final Table table = dynamoDB.getTable(tableName);
final TableDescription td = table.describe();
assertEquals(tableName, td.getTableName());
assertEquals("ACTIVE", td.getTableStatus());
return table;
}
/**
* This validates the table is not found in DynamoDB.
*
* This should not rely on the {@link DynamoDBMetadataStore} implementation.
*/
private static void verifyTableNotExist(String tableName) throws Exception{
intercept(ResourceNotFoundException.class,
() -> dynamoDB.getTable(tableName).describe());
}
}