HADOOP-15957. WASB: Add asterisk wildcard support for PageBlobDirSet.
Contributed by Da Zhou.
This commit is contained in:
parent
c9bfca217f
commit
7ccb640a66
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
|
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
|
||||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
|
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
|
||||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
|
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
|
||||||
@ -241,6 +242,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||||||
private static final String HTTP_SCHEME = "http";
|
private static final String HTTP_SCHEME = "http";
|
||||||
private static final String HTTPS_SCHEME = "https";
|
private static final String HTTPS_SCHEME = "https";
|
||||||
private static final String WASB_AUTHORITY_DELIMITER = "@";
|
private static final String WASB_AUTHORITY_DELIMITER = "@";
|
||||||
|
private static final char ASTERISK_SYMBOL = '*';
|
||||||
private static final String AZURE_ROOT_CONTAINER = "$root";
|
private static final String AZURE_ROOT_CONTAINER = "$root";
|
||||||
|
|
||||||
private static final int DEFAULT_CONCURRENT_WRITES = 8;
|
private static final int DEFAULT_CONCURRENT_WRITES = 8;
|
||||||
@ -1169,7 +1171,7 @@ private Set<String> getDirectorySet(final String configVar)
|
|||||||
for (String currentDir : rawDirs) {
|
for (String currentDir : rawDirs) {
|
||||||
String myDir;
|
String myDir;
|
||||||
try {
|
try {
|
||||||
myDir = verifyAndConvertToStandardFormat(currentDir);
|
myDir = verifyAndConvertToStandardFormat(currentDir.trim());
|
||||||
} catch (URISyntaxException ex) {
|
} catch (URISyntaxException ex) {
|
||||||
throw new AzureException(String.format(
|
throw new AzureException(String.format(
|
||||||
"The directory %s specified in the configuration entry %s is not"
|
"The directory %s specified in the configuration entry %s is not"
|
||||||
@ -1214,7 +1216,12 @@ public boolean isAtomicRenameKey(String key) {
|
|||||||
public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
|
public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
|
||||||
String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
|
String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
|
||||||
for (String dir : dirSet) {
|
for (String dir : dirSet) {
|
||||||
if (dir.isEmpty() || key.startsWith(dir + "/")) {
|
if (dir.isEmpty()) {
|
||||||
|
// dir is root
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (matchAsteriskPattern(key, dir)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1227,7 +1234,8 @@ public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
|
|||||||
// Concatenate the default file system prefix with the relative
|
// Concatenate the default file system prefix with the relative
|
||||||
// page blob directory path.
|
// page blob directory path.
|
||||||
//
|
//
|
||||||
if (key.startsWith(trim(defaultFS, "/") + "/" + dir + "/")){
|
String dirWithPrefix = trim(defaultFS, "/") + "/" + dir;
|
||||||
|
if (matchAsteriskPattern(key, dirWithPrefix)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1238,6 +1246,54 @@ public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean matchAsteriskPattern(String pathName, String pattern) {
|
||||||
|
if (pathName == null || pathName.length() == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int pathIndex = 0;
|
||||||
|
int patternIndex = 0;
|
||||||
|
|
||||||
|
while (pathIndex < pathName.length() && patternIndex < pattern.length()) {
|
||||||
|
char charToMatch = pattern.charAt(patternIndex);
|
||||||
|
|
||||||
|
// normal char:
|
||||||
|
if (charToMatch != ASTERISK_SYMBOL) {
|
||||||
|
if (charToMatch != pathName.charAt(pathIndex)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pathIndex++;
|
||||||
|
patternIndex++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ASTERISK_SYMBOL
|
||||||
|
// 1. * is used in path name: *a/b,a*/b, a/*b, a/b*
|
||||||
|
if (patternIndex > 0 && pattern.charAt(patternIndex - 1) != Path.SEPARATOR_CHAR
|
||||||
|
|| patternIndex + 1 < pattern.length() && pattern.charAt(patternIndex + 1) != Path.SEPARATOR_CHAR) {
|
||||||
|
if (ASTERISK_SYMBOL != pathName.charAt(pathIndex)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pathIndex++;
|
||||||
|
patternIndex++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. * is used as wildcard: */a, a/*/b, a/*
|
||||||
|
patternIndex++;
|
||||||
|
// find next path separator
|
||||||
|
while (pathIndex < pathName.length() && pathName.charAt(pathIndex) != Path.SEPARATOR_CHAR) {
|
||||||
|
pathIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure it is not a file/dir which shares same prefix as pattern
|
||||||
|
// Eg: pattern: /A/B, pathName: /A/BBB should not match
|
||||||
|
return patternIndex == pattern.length()
|
||||||
|
&& (pathIndex == pathName.length() || pathName.charAt(pathIndex) == Path.SEPARATOR_CHAR);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the file block size. This is a fake value used for integration
|
* Returns the file block size. This is a fake value used for integration
|
||||||
* of the Azure store with Hadoop.
|
* of the Azure store with Hadoop.
|
||||||
|
@ -230,7 +230,7 @@ public void testIsPageBlobKey() {
|
|||||||
|
|
||||||
// negative tests
|
// negative tests
|
||||||
String[] negativeKeys = { "", "/", "bar", "bar/", "bar/pageBlobs", "bar/pageBlobs/foo",
|
String[] negativeKeys = { "", "/", "bar", "bar/", "bar/pageBlobs", "bar/pageBlobs/foo",
|
||||||
"bar/pageBlobs/foo/", "/pageBlobs/", "/pageBlobs", "pageBlobs", "pageBlobsxyz/" };
|
"bar/pageBlobs/foo/", "/pageBlobs/", "/pageBlobs", "pageBlobsxyz/" };
|
||||||
for (String s : negativeKeys) {
|
for (String s : negativeKeys) {
|
||||||
assertFalse(store.isPageBlobKey(s));
|
assertFalse(store.isPageBlobKey(s));
|
||||||
assertFalse(store.isPageBlobKey(uriPrefix + s));
|
assertFalse(store.isPageBlobKey(uriPrefix + s));
|
||||||
@ -262,7 +262,7 @@ public void testIsAtomicRenameKey() {
|
|||||||
|
|
||||||
// negative tests
|
// negative tests
|
||||||
String[] negativeKeys = { "", "/", "bar", "bar/", "bar/hbase",
|
String[] negativeKeys = { "", "/", "bar", "bar/", "bar/hbase",
|
||||||
"bar/hbase/foo", "bar/hbase/foo/", "/hbase/", "/hbase", "hbase",
|
"bar/hbase/foo", "bar/hbase/foo/", "/hbase/", "/hbase",
|
||||||
"hbasexyz/", "foo/atomicRenameDir1/"};
|
"hbasexyz/", "foo/atomicRenameDir1/"};
|
||||||
for (String s : negativeKeys) {
|
for (String s : negativeKeys) {
|
||||||
assertFalse(store.isAtomicRenameKey(s));
|
assertFalse(store.isAtomicRenameKey(s));
|
||||||
|
@ -0,0 +1,170 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test config property KEY_PAGE_BLOB_DIRECTORIES.
|
||||||
|
*/
|
||||||
|
public class TestKeyPageBlobDirectories extends AbstractWasbTestBase{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||||
|
return AzureBlobStorageTestAccount.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void expectPageBlobKey(boolean expectedOutcome, AzureNativeFileSystemStore store, String path) {
|
||||||
|
assertEquals("Unexpected result for isPageBlobKey(" + path + ")",
|
||||||
|
expectedOutcome, store.isPageBlobKey(path));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKeySetWithoutAsterisk() throws Exception {
|
||||||
|
NativeAzureFileSystem azureFs = fs;
|
||||||
|
AzureNativeFileSystemStore store = azureFs.getStore();
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String dirList = "/service/WALs,/data/mypageblobfiles";
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, dirList);
|
||||||
|
URI uri = fs.getUri();
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "/");
|
||||||
|
expectPageBlobKey(false, store, "service");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "service/dir/recovered.edits");
|
||||||
|
expectPageBlobKey(true, store, "service/WALs/recovered.edits");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "data/dir/recovered.txt");
|
||||||
|
expectPageBlobKey(true, store, "data/mypageblobfiles/recovered.txt");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKeySetWithAsterisk() throws Exception {
|
||||||
|
NativeAzureFileSystem azureFs = fs;
|
||||||
|
AzureNativeFileSystemStore store = azureFs.getStore();
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String dirList = "/service/*/*/*/recovered.edits,/*/recovered.edits,/*/*/*/WALs, /*/*/oldWALs/*/*";
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, dirList);
|
||||||
|
URI uri = fs.getUri();
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "/");
|
||||||
|
expectPageBlobKey(false, store, "service");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "service/dir/recovered.edits");
|
||||||
|
expectPageBlobKey(true, store, "service/dir1/dir2/dir3/recovered.edits");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "data/dir/recovered.edits");
|
||||||
|
expectPageBlobKey(true, store, "data/recovered.edits");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "dir1/dir2/WALs/data");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/dir3/WALs/data1");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/dir3/WALs/data2");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "dir1/oldWALs/data");
|
||||||
|
expectPageBlobKey(false, store, "dir1/dir2/oldWALs/data");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/oldWALs/dir3/dir4/data");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKeySetUsingFullName() throws Exception {
|
||||||
|
NativeAzureFileSystem azureFs = fs;
|
||||||
|
AzureNativeFileSystemStore store = azureFs.getStore();
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String dirList = "/service/WALs,/data/mypageblobfiles,/*/*/WALs,/*/*/recover.edits";
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, dirList);
|
||||||
|
URI uri = fs.getUri();
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
|
final String defaultFS = FileSystem.getDefaultUri(conf).toString();
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, defaultFS + "service/recover.edits");
|
||||||
|
expectPageBlobKey(true, store, defaultFS + "service/WALs/recover.edits");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, defaultFS + "data/mismatch/mypageblobfiles/data");
|
||||||
|
expectPageBlobKey(true, store, defaultFS + "data/mypageblobfiles/data");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, defaultFS + "dir1/dir2/dir3/WALs/data");
|
||||||
|
expectPageBlobKey(true, store, defaultFS + "dir1/dir2/WALs/data");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, defaultFS + "dir1/dir2/dir3/recover.edits");
|
||||||
|
expectPageBlobKey(true, store, defaultFS + "dir1/dir2/recover.edits");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKeyContainsAsterisk() throws IOException {
|
||||||
|
NativeAzureFileSystem azureFs = fs;
|
||||||
|
AzureNativeFileSystemStore store = azureFs.getStore();
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
// Test dir name which contains *
|
||||||
|
String dirList = "/service/*/*/*/d*ir,/*/fi**le.data,/*/*/*/WALs*, /*/*/oldWALs";
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, dirList);
|
||||||
|
URI uri = fs.getUri();
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "/");
|
||||||
|
expectPageBlobKey(false, store, "service");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "service/d*ir/data");
|
||||||
|
expectPageBlobKey(true, store, "service/dir1/dir2/dir3/d*ir/data");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "dir/fi*le.data");
|
||||||
|
expectPageBlobKey(true, store, "dir/fi**le.data");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "dir1/dir2/WALs/data");
|
||||||
|
expectPageBlobKey(false, store, "dir1/dir2/dir3/WALs/data");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/dir3/WALs*/data1");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/dir3/WALs*/data2");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "dir1/oldWALs/data");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/oldWALs/data1");
|
||||||
|
expectPageBlobKey(true, store, "dir1/dir2/oldWALs/data2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKeyWithCommonPrefix() throws IOException {
|
||||||
|
NativeAzureFileSystem azureFs = fs;
|
||||||
|
AzureNativeFileSystemStore store = azureFs.getStore();
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
// Test dir name which contains *
|
||||||
|
String dirList = "/service/WALs,/*/*/WALs";
|
||||||
|
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, dirList);
|
||||||
|
URI uri = fs.getUri();
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "/");
|
||||||
|
expectPageBlobKey(false, store, "service");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "service/WALsssssss/dir");
|
||||||
|
expectPageBlobKey(true, store, "service/WALs/dir");
|
||||||
|
|
||||||
|
expectPageBlobKey(false, store, "service/dir/WALsss/data");
|
||||||
|
expectPageBlobKey(true, store, "service/dir/WALs/data");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user