HADOOP-14993. AliyunOSS: Override listFiles and listLocatedStatus. Contributed Genmao Yu

This commit is contained in:
Kai Zheng 2017-11-14 17:58:37 +08:00
parent 4f40cd314a
commit 18621af7ae
5 changed files with 309 additions and 15 deletions

View File

@ -28,14 +28,18 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
@ -46,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
@ -60,6 +65,12 @@ public class AliyunOSSFileSystem extends FileSystem {
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
return true;
}
};
@Override
public FSDataOutputStream append(Path path, int bufferSize,
@ -301,18 +312,6 @@ public void initialize(URI name, Configuration conf) throws IOException {
setConf(conf);
}
/**
* Check if OSS object represents a directory.
*
* @param name object key
* @param size object content length
* @return true if object represents a directory
*/
private boolean objectRepresentsDirectory(final String name,
final long size) {
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
}
/**
* Turn a path (relative or otherwise) into an OSS key.
*
@ -404,6 +403,58 @@ public FileStatus[] listStatus(Path path) throws IOException {
return result.toArray(new FileStatus[result.size()]);
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
final FileStatus status = getFileStatus(qualifiedPath);
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
return status.isFile() || !path.equals(f);
}
};
FileStatusAcceptor acceptor =
new FileStatusAcceptor.AcceptFilesOnly(qualifiedPath);
return innerList(f, status, filter, acceptor, recursive);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return listLocatedStatus(f, DEFAULT_FILTER);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
final FileStatus status = getFileStatus(qualifiedPath);
FileStatusAcceptor acceptor =
new FileStatusAcceptor.AcceptAllButSelf(qualifiedPath);
return innerList(f, status, filter, acceptor, false);
}
private RemoteIterator<LocatedFileStatus> innerList(final Path f,
final FileStatus status,
final PathFilter filter,
final FileStatusAcceptor acceptor,
final boolean recursive) throws IOException {
Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
if (status.isFile()) {
LOG.debug("{} is a File", qualifiedPath);
final BlockLocation[] locations = getFileBlockLocations(status,
0, status.getLen());
return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
locations);
} else {
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
acceptor, recursive ? null : "/");
}
}
/**
* Used to create an empty file that represents an empty directory.
*

View File

@ -46,7 +46,13 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,6 +64,8 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
@ -546,4 +554,102 @@ public void purge(String prefix) throws IOException {
LOG.error("Failed to purge " + prefix);
}
}
public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
final FileStatus fileStatus, final BlockLocation[] locations) {
return new RemoteIterator<LocatedFileStatus>() {
private boolean hasNext = true;
@Override
public boolean hasNext() throws IOException {
return fileStatus != null && hasNext;
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus s = new LocatedFileStatus(fileStatus,
fileStatus.isFile() ? locations : null);
hasNext = false;
return s;
} else {
throw new NoSuchElementException();
}
}
};
}
public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
final String prefix, final int maxListingLength, FileSystem fs,
PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
return new RemoteIterator<LocatedFileStatus>() {
private String nextMarker = null;
private boolean firstListing = true;
private boolean meetEnd = false;
private ListIterator<FileStatus> batchIterator;
@Override
public boolean hasNext() throws IOException {
if (firstListing) {
requestNextBatch();
firstListing = false;
}
return batchIterator.hasNext() || requestNextBatch();
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
FileStatus status = batchIterator.next();
BlockLocation[] locations = fs.getFileBlockLocations(status,
0, status.getLen());
return new LocatedFileStatus(
status, status.isFile() ? locations : null);
} else {
throw new NoSuchElementException();
}
}
private boolean requestNextBatch() {
if (meetEnd) {
return false;
}
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(nextMarker);
listRequest.setDelimiter(delimiter);
ObjectListing listing = ossClient.listObjects(listRequest);
List<FileStatus> stats = new ArrayList<>(
listing.getObjectSummaries().size() +
listing.getCommonPrefixes().size());
for(OSSObjectSummary summary: listing.getObjectSummaries()) {
String key = summary.getKey();
Path path = fs.makeQualified(new Path("/" + key));
if (filter.accept(path) && acceptor.accept(path, summary)) {
FileStatus status = new FileStatus(summary.getSize(),
key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
summary.getLastModified().getTime(), path);
stats.add(status);
}
}
for(String commonPrefix: listing.getCommonPrefixes()) {
Path path = fs.makeQualified(new Path("/" + commonPrefix));
if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
stats.add(status);
}
}
batchIterator = stats.listIterator();
if (listing.isTruncated()) {
nextMarker = listing.getNextMarker();
} else {
meetEnd = true;
}
statistics.incrementReadOps(1);
return batchIterator.hasNext();
}
};
}
}

View File

@ -164,4 +164,16 @@ public static String maybeAddTrailingSlash(String key) {
return key;
}
}
/**
* Check if OSS object represents a directory.
*
* @param name object key
* @param size object content length
* @return true if object represents a directory
*/
public static boolean objectRepresentsDirectory(final String name,
final long size) {
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.model.OSSObjectSummary;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
*/
public interface FileStatusAcceptor {
/**
* Predicate to decide whether or not to accept a summary entry.
* @param keyPath qualified path to the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
boolean accept(Path keyPath, OSSObjectSummary summary);
/**
* Predicate to decide whether or not to accept a prefix.
* @param keyPath qualified path to the entry
* @param commonPrefix the prefix
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.)
*/
boolean accept(Path keyPath, String commonPrefix);
/**
* Accept all entries except the base path.
*/
class AcceptFilesOnly implements FileStatusAcceptor {
private final Path qualifiedPath;
public AcceptFilesOnly(Path qualifiedPath) {
this.qualifiedPath = qualifiedPath;
}
/**
* Reject a summary entry if the key path is the qualified Path.
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
@Override
public boolean accept(Path keyPath, OSSObjectSummary summary) {
return !keyPath.equals(qualifiedPath)
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
}
/**
* Accept no directory paths.
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return false, always.
*/
@Override
public boolean accept(Path keyPath, String prefix) {
return false;
}
}
/**
* Accept all entries except the base path.
*/
class AcceptAllButSelf implements FileStatusAcceptor {
/** Base path. */
private final Path qualifiedPath;
/**
* Constructor.
* @param qualifiedPath an already-qualified path.
*/
public AcceptAllButSelf(Path qualifiedPath) {
this.qualifiedPath = qualifiedPath;
}
/**
* Reject a summary entry if the key path is the qualified Path.
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.)
*/
@Override
public boolean accept(Path keyPath, OSSObjectSummary summary) {
return !keyPath.equals(qualifiedPath);
}
/**
* Accept all prefixes except the one for the base path, "self".
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return true if the entry is accepted (i.e. that a status entry
* should be generated.
*/
@Override
public boolean accept(Path keyPath, String prefix) {
return !keyPath.equals(qualifiedPath);
}
}
}

View File

@ -56,14 +56,14 @@ Authorization occurs at the level of the entire Aliyun account via
4. The append operation is not supported.
### Warning #2: Directory last access time is not tracked,
features of Hadoop relying on this can have unexpected behaviour. E.g. the
AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
Features of Hadoop relying on this can have unexpected behaviour. E.g. the
AggregatedLogDeletionService of YARN will not remove the appropriate log files.
### Warning #3: Your Aliyun credentials are valuable
Your Aliyun credentials not only pay for services, they offer read and write
access to the data. Anyone with the account can not only read your datasets
—they can delete them.
they can delete them.
Do not inadvertently share these credentials through means such as
1. Checking in to SCM any configuration files containing the secrets.