HADOOP-14759 S3GuardTool prune to prune specific bucket entries. Contributed by Gabor Bota.
This commit is contained in:
parent
6cf023f9b7
commit
ea3849f0cc
@ -812,23 +812,33 @@ public void destroy() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Retries.OnceRaw
|
@Retries.OnceRaw
|
||||||
private ItemCollection<ScanOutcome> expiredFiles(long modTime) {
|
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
|
||||||
String filterExpression = "mod_time < :mod_time";
|
String keyPrefix) {
|
||||||
|
String filterExpression =
|
||||||
|
"mod_time < :mod_time and begins_with(parent, :parent)";
|
||||||
String projectionExpression = "parent,child";
|
String projectionExpression = "parent,child";
|
||||||
ValueMap map = new ValueMap().withLong(":mod_time", modTime);
|
ValueMap map = new ValueMap()
|
||||||
|
.withLong(":mod_time", modTime)
|
||||||
|
.withString(":parent", keyPrefix);
|
||||||
return table.scan(filterExpression, projectionExpression, null, map);
|
return table.scan(filterExpression, projectionExpression, null, map);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw("once(batchWrite)")
|
@Retries.OnceRaw("once(batchWrite)")
|
||||||
public void prune(long modTime) throws IOException {
|
public void prune(long modTime) throws IOException {
|
||||||
|
prune(modTime, "/");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Retries.OnceRaw("once(batchWrite)")
|
||||||
|
public void prune(long modTime, String keyPrefix) throws IOException {
|
||||||
int itemCount = 0;
|
int itemCount = 0;
|
||||||
try {
|
try {
|
||||||
Collection<Path> deletionBatch =
|
Collection<Path> deletionBatch =
|
||||||
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
||||||
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
||||||
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
|
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
|
||||||
for (Item item : expiredFiles(modTime)) {
|
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
||||||
PathMetadata md = PathMetadataDynamoDBTranslation
|
PathMetadata md = PathMetadataDynamoDBTranslation
|
||||||
.itemToPathMetadata(item, username);
|
.itemToPathMetadata(item, username);
|
||||||
Path path = md.getFileStatus().getPath();
|
Path path = md.getFileStatus().getPath();
|
||||||
|
@ -303,12 +303,18 @@ public void destroy() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void prune(long modTime) throws IOException {
|
public void prune(long modTime) throws IOException{
|
||||||
|
prune(modTime, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void prune(long modTime, String keyPrefix)
|
||||||
|
throws IOException {
|
||||||
Iterator<Map.Entry<Path, PathMetadata>> files =
|
Iterator<Map.Entry<Path, PathMetadata>> files =
|
||||||
fileHash.entrySet().iterator();
|
fileHash.entrySet().iterator();
|
||||||
while (files.hasNext()) {
|
while (files.hasNext()) {
|
||||||
Map.Entry<Path, PathMetadata> entry = files.next();
|
Map.Entry<Path, PathMetadata> entry = files.next();
|
||||||
if (expired(entry.getValue().getFileStatus(), modTime)) {
|
if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) {
|
||||||
files.remove();
|
files.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -323,7 +329,7 @@ public synchronized void prune(long modTime) throws IOException {
|
|||||||
|
|
||||||
for (PathMetadata child : oldChildren) {
|
for (PathMetadata child : oldChildren) {
|
||||||
FileStatus status = child.getFileStatus();
|
FileStatus status = child.getFileStatus();
|
||||||
if (!expired(status, modTime)) {
|
if (!expired(status, modTime, keyPrefix)) {
|
||||||
newChildren.add(child);
|
newChildren.add(child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,10 +345,11 @@ public synchronized void prune(long modTime) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean expired(FileStatus status, long expiry) {
|
private boolean expired(FileStatus status, long expiry, String keyPrefix) {
|
||||||
// Note: S3 doesn't track modification time on directories, so for
|
// Note: S3 doesn't track modification time on directories, so for
|
||||||
// consistency with the DynamoDB implementation we ignore that here
|
// consistency with the DynamoDB implementation we ignore that here
|
||||||
return status.getModificationTime() < expiry && !status.isDirectory();
|
return status.getModificationTime() < expiry && !status.isDirectory()
|
||||||
|
&& status.getPath().toString().startsWith(keyPrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -223,6 +223,18 @@ void move(Collection<Path> pathsToDelete,
|
|||||||
*/
|
*/
|
||||||
void prune(long modTime) throws IOException, UnsupportedOperationException;
|
void prune(long modTime) throws IOException, UnsupportedOperationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same as {@link MetadataStore#prune(long)}, but with an additional
|
||||||
|
* keyPrefix parameter to filter the pruned keys with a prefix.
|
||||||
|
*
|
||||||
|
* @param modTime Oldest modification time to allow
|
||||||
|
* @param keyPrefix The prefix for the keys that should be removed
|
||||||
|
* @throws IOException if there is an error
|
||||||
|
* @throws UnsupportedOperationException if not implemented
|
||||||
|
*/
|
||||||
|
void prune(long modTime, String keyPrefix)
|
||||||
|
throws IOException, UnsupportedOperationException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get any diagnostics information from a store, as a list of (key, value)
|
* Get any diagnostics information from a store, as a list of (key, value)
|
||||||
* tuples for display. Arbitrary values; no guarantee of stability.
|
* tuples for display. Arbitrary values; no guarantee of stability.
|
||||||
|
@ -99,6 +99,10 @@ public void destroy() throws IOException {
|
|||||||
public void prune(long modTime) {
|
public void prune(long modTime) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prune(long modTime, String keyPrefix) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "NullMetadataStore";
|
return "NullMetadataStore";
|
||||||
|
@ -966,8 +966,16 @@ public int run(String[] args, PrintStream out) throws
|
|||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
long divide = now - delta;
|
long divide = now - delta;
|
||||||
|
|
||||||
|
// remove the protocol from path string to get keyPrefix
|
||||||
|
// by default the keyPrefix is "/" - unless the s3 URL is provided
|
||||||
|
String keyPrefix = "/";
|
||||||
|
if(paths.size() > 0) {
|
||||||
|
Path path = new Path(paths.get(0));
|
||||||
|
keyPrefix = PathMetadataDynamoDBTranslation.pathToParentKey(path);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
getStore().prune(divide);
|
getStore().prune(divide, keyPrefix);
|
||||||
} catch (UnsupportedOperationException e){
|
} catch (UnsupportedOperationException e){
|
||||||
errorln("Prune operation not supported in metadata store.");
|
errorln("Prune operation not supported in metadata store.");
|
||||||
}
|
}
|
||||||
|
@ -592,8 +592,8 @@ A time value of hours, minutes and/or seconds must be supplied.
|
|||||||
1. This does not delete the entries in the bucket itself.
|
1. This does not delete the entries in the bucket itself.
|
||||||
1. The modification time is effectively the creation time of the objects
|
1. The modification time is effectively the creation time of the objects
|
||||||
in the S3 Bucket.
|
in the S3 Bucket.
|
||||||
1. Even when an S3A URI is supplied, all entries in the table older than
|
1. If an S3A URI is supplied, only the entries in the table specified by the
|
||||||
a specific age are deleted — even those from other buckets.
|
URI and older than a specific age are deleted.
|
||||||
|
|
||||||
Example
|
Example
|
||||||
|
|
||||||
@ -604,6 +604,13 @@ hadoop s3guard prune -days 7 s3a://ireland-1
|
|||||||
Deletes all entries in the S3Guard table for files older than seven days from
|
Deletes all entries in the S3Guard table for files older than seven days from
|
||||||
the table associated with `s3a://ireland-1`.
|
the table associated with `s3a://ireland-1`.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard prune -days 7 s3a://ireland-1/path_prefix/
|
||||||
|
```
|
||||||
|
|
||||||
|
Deletes all entries in the S3Guard table for files older than seven days from
|
||||||
|
the table associated with `s3a://ireland-1` and with the prefix "path_prefix"
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
hadoop s3guard prune -hours 1 -minutes 30 -meta dynamodb://ireland-team -region eu-west-1
|
hadoop s3guard prune -hours 1 -minutes 30 -meta dynamodb://ireland-team -region eu-west-1
|
||||||
```
|
```
|
||||||
|
@ -181,22 +181,26 @@ protected void createFile(Path path, boolean onS3, boolean onMetadataStore)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testPruneCommand(Configuration cmdConf, String...args)
|
private void testPruneCommand(Configuration cmdConf, Path parent,
|
||||||
throws Exception {
|
String...args) throws Exception {
|
||||||
Path parent = path("prune-cli");
|
Path keepParent = path("prune-cli-keep");
|
||||||
try {
|
try {
|
||||||
getFileSystem().mkdirs(parent);
|
getFileSystem().mkdirs(parent);
|
||||||
|
getFileSystem().mkdirs(keepParent);
|
||||||
|
|
||||||
S3GuardTool.Prune cmd = new S3GuardTool.Prune(cmdConf);
|
S3GuardTool.Prune cmd = new S3GuardTool.Prune(cmdConf);
|
||||||
cmd.setMetadataStore(ms);
|
cmd.setMetadataStore(ms);
|
||||||
|
|
||||||
createFile(new Path(parent, "stale"), true, true);
|
createFile(new Path(parent, "stale"), true, true);
|
||||||
|
createFile(new Path(keepParent, "stale-to-keep"), true, true);
|
||||||
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
|
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
|
||||||
createFile(new Path(parent, "fresh"), true, true);
|
createFile(new Path(parent, "fresh"), true, true);
|
||||||
|
|
||||||
assertMetastoreListingCount(parent, "Children count before pruning", 2);
|
assertMetastoreListingCount(parent, "Children count before pruning", 2);
|
||||||
exec(cmd, args);
|
exec(cmd, args);
|
||||||
assertMetastoreListingCount(parent, "Pruned children count", 1);
|
assertMetastoreListingCount(parent, "Pruned children count", 1);
|
||||||
|
assertMetastoreListingCount(keepParent,
|
||||||
|
"This child should have been kept (prefix restriction).", 1);
|
||||||
} finally {
|
} finally {
|
||||||
getFileSystem().delete(parent, true);
|
getFileSystem().delete(parent, true);
|
||||||
ms.prune(Long.MAX_VALUE);
|
ms.prune(Long.MAX_VALUE);
|
||||||
@ -213,17 +217,18 @@ private void assertMetastoreListingCount(Path parent,
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPruneCommandCLI() throws Exception {
|
public void testPruneCommandCLI() throws Exception {
|
||||||
String testPath = path("testPruneCommandCLI").toString();
|
Path testPath = path("testPruneCommandCLI");
|
||||||
testPruneCommand(getFileSystem().getConf(),
|
testPruneCommand(getFileSystem().getConf(), testPath,
|
||||||
"prune", "-seconds", "1", testPath);
|
"prune", "-seconds", "1", testPath.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPruneCommandConf() throws Exception {
|
public void testPruneCommandConf() throws Exception {
|
||||||
getConfiguration().setLong(Constants.S3GUARD_CLI_PRUNE_AGE,
|
getConfiguration().setLong(Constants.S3GUARD_CLI_PRUNE_AGE,
|
||||||
TimeUnit.SECONDS.toMillis(1));
|
TimeUnit.SECONDS.toMillis(1));
|
||||||
String testPath = path("testPruneCommandConf").toString();
|
Path testPath = path("testPruneCommandConf");
|
||||||
testPruneCommand(getConfiguration(), "prune", testPath);
|
testPruneCommand(getConfiguration(), testPath,
|
||||||
|
"prune", testPath.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user