HADOOP-15943. AliyunOSS: add missing owner & group attributes for oss FileStatus. Contributed by wujinhu.
This commit is contained in:
parent
e223a790a7
commit
5ff0cf86a9
@ -45,6 +45,7 @@
|
|||||||
import org.apache.hadoop.fs.PathIOException;
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
@ -70,6 +71,7 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||||||
LoggerFactory.getLogger(AliyunOSSFileSystem.class);
|
LoggerFactory.getLogger(AliyunOSSFileSystem.class);
|
||||||
private URI uri;
|
private URI uri;
|
||||||
private String bucket;
|
private String bucket;
|
||||||
|
private String username;
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private int blockOutputActiveBlocks;
|
private int blockOutputActiveBlocks;
|
||||||
private AliyunOSSFileSystemStore store;
|
private AliyunOSSFileSystemStore store;
|
||||||
@ -259,7 +261,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
|
|||||||
|
|
||||||
// Root always exists
|
// Root always exists
|
||||||
if (key.length() == 0) {
|
if (key.length() == 0) {
|
||||||
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
|
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectMetadata meta = store.getObjectMetadata(key);
|
ObjectMetadata meta = store.getObjectMetadata(key);
|
||||||
@ -273,17 +275,17 @@ public FileStatus getFileStatus(Path path) throws IOException {
|
|||||||
ObjectListing listing = store.listObjects(key, 1, null, false);
|
ObjectListing listing = store.listObjects(key, 1, null, false);
|
||||||
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
|
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
|
||||||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
|
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
|
||||||
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
|
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
|
||||||
} else {
|
} else {
|
||||||
throw new FileNotFoundException(path + ": No such file or directory!");
|
throw new FileNotFoundException(path + ": No such file or directory!");
|
||||||
}
|
}
|
||||||
} else if (objectRepresentsDirectory(key, meta.getContentLength())) {
|
} else if (objectRepresentsDirectory(key, meta.getContentLength())) {
|
||||||
return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
|
return new OSSFileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
|
||||||
qualifiedPath);
|
qualifiedPath, username);
|
||||||
} else {
|
} else {
|
||||||
return new FileStatus(meta.getContentLength(), false, 1,
|
return new OSSFileStatus(meta.getContentLength(), false, 1,
|
||||||
getDefaultBlockSize(path), meta.getLastModified().getTime(),
|
getDefaultBlockSize(path), meta.getLastModified().getTime(),
|
||||||
qualifiedPath);
|
qualifiedPath, username);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -330,15 +332,16 @@ public void initialize(URI name, Configuration conf) throws IOException {
|
|||||||
|
|
||||||
bucket = name.getHost();
|
bucket = name.getHost();
|
||||||
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
|
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||||
workingDir = new Path("/user",
|
// Username is the current user at the time the FS was instantiated.
|
||||||
System.getProperty("user.name")).makeQualified(uri, null);
|
username = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
workingDir = new Path("/user", username).makeQualified(uri, null);
|
||||||
long keepAliveTime = longOption(conf,
|
long keepAliveTime = longOption(conf,
|
||||||
KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
|
KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
|
||||||
blockOutputActiveBlocks = intOption(conf,
|
blockOutputActiveBlocks = intOption(conf,
|
||||||
UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
|
UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
|
||||||
|
|
||||||
store = new AliyunOSSFileSystemStore();
|
store = new AliyunOSSFileSystemStore();
|
||||||
store.initialize(name, conf, statistics);
|
store.initialize(name, conf, username, statistics);
|
||||||
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
||||||
|
|
||||||
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
|
int threadNum = AliyunOSSUtils.intPositiveOption(conf,
|
||||||
@ -423,9 +426,9 @@ public FileStatus[] listStatus(Path path) throws IOException {
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adding: fi: " + keyPath);
|
LOG.debug("Adding: fi: " + keyPath);
|
||||||
}
|
}
|
||||||
result.add(new FileStatus(objectSummary.getSize(), false, 1,
|
result.add(new OSSFileStatus(objectSummary.getSize(), false, 1,
|
||||||
getDefaultBlockSize(keyPath),
|
getDefaultBlockSize(keyPath),
|
||||||
objectSummary.getLastModified().getTime(), keyPath));
|
objectSummary.getLastModified().getTime(), keyPath, username));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,6 +82,7 @@
|
|||||||
public class AliyunOSSFileSystemStore {
|
public class AliyunOSSFileSystemStore {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
|
LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
|
||||||
|
private String username;
|
||||||
private FileSystem.Statistics statistics;
|
private FileSystem.Statistics statistics;
|
||||||
private OSSClient ossClient;
|
private OSSClient ossClient;
|
||||||
private String bucketName;
|
private String bucketName;
|
||||||
@ -90,8 +91,9 @@ public class AliyunOSSFileSystemStore {
|
|||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
private String serverSideEncryptionAlgorithm;
|
private String serverSideEncryptionAlgorithm;
|
||||||
|
|
||||||
public void initialize(URI uri, Configuration conf,
|
public void initialize(URI uri, Configuration conf, String user,
|
||||||
FileSystem.Statistics stat) throws IOException {
|
FileSystem.Statistics stat) throws IOException {
|
||||||
|
this.username = user;
|
||||||
statistics = stat;
|
statistics = stat;
|
||||||
ClientConfiguration clientConf = new ClientConfiguration();
|
ClientConfiguration clientConf = new ClientConfiguration();
|
||||||
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
|
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
|
||||||
@ -572,9 +574,9 @@ private boolean requestNextBatch() {
|
|||||||
String key = summary.getKey();
|
String key = summary.getKey();
|
||||||
Path path = fs.makeQualified(new Path("/" + key));
|
Path path = fs.makeQualified(new Path("/" + key));
|
||||||
if (filter.accept(path) && acceptor.accept(path, summary)) {
|
if (filter.accept(path) && acceptor.accept(path, summary)) {
|
||||||
FileStatus status = new FileStatus(summary.getSize(),
|
FileStatus status = new OSSFileStatus(summary.getSize(),
|
||||||
key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
|
key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
|
||||||
summary.getLastModified().getTime(), path);
|
summary.getLastModified().getTime(), path, username);
|
||||||
stats.add(status);
|
stats.add(status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -582,7 +584,8 @@ private boolean requestNextBatch() {
|
|||||||
for (String commonPrefix : listing.getCommonPrefixes()) {
|
for (String commonPrefix : listing.getCommonPrefixes()) {
|
||||||
Path path = fs.makeQualified(new Path("/" + commonPrefix));
|
Path path = fs.makeQualified(new Path("/" + commonPrefix));
|
||||||
if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
|
if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
|
||||||
FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
|
FileStatus status = new OSSFileStatus(0, true, 1, 0, 0,
|
||||||
|
path, username);
|
||||||
stats.add(status);
|
stats.add(status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used by listStatus for oss files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class OSSFileStatus extends FileStatus {
|
||||||
|
public OSSFileStatus(long length, boolean isdir, int blockReplication,
|
||||||
|
long blocksize, long modTime, Path path, String user) {
|
||||||
|
super(length, isdir, blockReplication, blocksize, modTime, path);
|
||||||
|
setOwner(user);
|
||||||
|
setGroup(user);
|
||||||
|
}
|
||||||
|
}
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -82,6 +83,18 @@ public void testRenameRootDirForbidden() throws Exception {
|
|||||||
false, true, false);
|
false, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListStatus() throws IOException {
|
||||||
|
Path file = this.path("/test/hadoop/file");
|
||||||
|
this.createFile(file);
|
||||||
|
assertTrue("File exists", this.fs.exists(file));
|
||||||
|
FileStatus fs = this.fs.getFileStatus(file);
|
||||||
|
assertEquals(fs.getOwner(),
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
assertEquals(fs.getGroup(),
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteSubdir() throws IOException {
|
public void testDeleteSubdir() throws IOException {
|
||||||
Path parentDir = this.path("/test/hadoop");
|
Path parentDir = this.path("/test/hadoop");
|
||||||
|
Loading…
Reference in New Issue
Block a user