HDFS-4525. Provide an API for knowing that whether file is closed or not. Contributed by SreeHari.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7f13207ed1
commit
c5bb615317
@ -390,6 +390,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||||||
|
|
||||||
HDFS-4618. Default transaction interval for checkpoints is too low. (todd)
|
HDFS-4618. Default transaction interval for checkpoints is too low. (todd)
|
||||||
|
|
||||||
|
HDFS-4525. Provide an API for knowing that whether file is closed or not.
|
||||||
|
(SreeHari via umamahesh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -1544,7 +1544,22 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
|
|||||||
UnresolvedPathException.class);
|
UnresolvedPathException.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close status of a file
|
||||||
|
* @return true if file is already closed
|
||||||
|
*/
|
||||||
|
public boolean isFileClosed(String src) throws IOException{
|
||||||
|
checkOpen();
|
||||||
|
try {
|
||||||
|
return namenode.isFileClosed(src);
|
||||||
|
} catch(RemoteException re) {
|
||||||
|
throw re.unwrapRemoteException(AccessControlException.class,
|
||||||
|
FileNotFoundException.class,
|
||||||
|
UnresolvedPathException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the file info for a specific file or directory. If src
|
* Get the file info for a specific file or directory. If src
|
||||||
* refers to a symlink then the FileStatus of the link is returned.
|
* refers to a symlink then the FileStatus of the link is returned.
|
||||||
|
@ -917,4 +917,17 @@ protected URI canonicalizeUri(URI uri) {
|
|||||||
public boolean isInSafeMode() throws IOException {
|
public boolean isInSafeMode() throws IOException {
|
||||||
return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
|
return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the close status of a file
|
||||||
|
* @param src The path to the file
|
||||||
|
*
|
||||||
|
* @return return true if file is closed
|
||||||
|
* @throws FileNotFoundException if the file does not exist.
|
||||||
|
* @throws IOException If an I/O error occurred
|
||||||
|
*/
|
||||||
|
public boolean isFileClosed(Path src) throws IOException {
|
||||||
|
return dfs.isFileClosed(getPathName(src));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -757,7 +757,21 @@ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
|||||||
@Idempotent
|
@Idempotent
|
||||||
public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
|
public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
|
||||||
FileNotFoundException, UnresolvedLinkException, IOException;
|
FileNotFoundException, UnresolvedLinkException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the close status of a file
|
||||||
|
* @param src The string representation of the path to the file
|
||||||
|
*
|
||||||
|
* @return return true if file is closed
|
||||||
|
* @throws AccessControlException permission denied
|
||||||
|
* @throws FileNotFoundException file <code>src</code> is not found
|
||||||
|
* @throws UnresolvedLinkException if the path contains a symlink.
|
||||||
|
* @throws IOException If an I/O error occurred
|
||||||
|
*/
|
||||||
|
@Idempotent
|
||||||
|
public boolean isFileClosed(String src) throws AccessControlException,
|
||||||
|
FileNotFoundException, UnresolvedLinkException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the file info for a specific file or directory. If the path
|
* Get the file info for a specific file or directory. If the path
|
||||||
* refers to a symlink then the FileStatus of the symlink is returned.
|
* refers to a symlink then the FileStatus of the symlink is returned.
|
||||||
|
@ -76,6 +76,8 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||||
@ -864,4 +866,17 @@ public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
|
|||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IsFileClosedResponseProto isFileClosed(
|
||||||
|
RpcController controller, IsFileClosedRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
boolean result = server.isFileClosed(request.getSrc());
|
||||||
|
return IsFileClosedResponseProto.newBuilder().setResult(result).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -77,6 +77,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
||||||
@ -850,6 +851,19 @@ public DataEncryptionKey getDataEncryptionKey() throws IOException {
|
|||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isFileClosed(String src) throws AccessControlException,
|
||||||
|
FileNotFoundException, UnresolvedLinkException, IOException {
|
||||||
|
IsFileClosedRequestProto req = IsFileClosedRequestProto.newBuilder()
|
||||||
|
.setSrc(src).build();
|
||||||
|
try {
|
||||||
|
return rpcProxy.isFileClosed(null, req).getResult();
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getUnderlyingProxyObject() {
|
public Object getUnderlyingProxyObject() {
|
||||||
|
@ -2973,6 +2973,33 @@ HdfsFileStatus getFileInfo(String src, boolean resolveLink)
|
|||||||
logAuditEvent(true, "getfileinfo", src);
|
logAuditEvent(true, "getfileinfo", src);
|
||||||
return stat;
|
return stat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the file is closed
|
||||||
|
*/
|
||||||
|
boolean isFileClosed(String src)
|
||||||
|
throws AccessControlException, UnresolvedLinkException,
|
||||||
|
StandbyException, IOException {
|
||||||
|
FSPermissionChecker pc = getPermissionChecker();
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
readLock();
|
||||||
|
try {
|
||||||
|
checkOperation(OperationCategory.READ);
|
||||||
|
if (isPermissionEnabled) {
|
||||||
|
checkTraverse(pc, src);
|
||||||
|
}
|
||||||
|
return !INodeFile.valueOf(dir.getINode(src), src).isUnderConstruction();
|
||||||
|
} catch (AccessControlException e) {
|
||||||
|
if (isAuditEnabled() && isExternalInvocation()) {
|
||||||
|
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
|
||||||
|
getRemoteIp(),
|
||||||
|
"isFileClosed", src, null, null);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
readUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create all the necessary directories
|
* Create all the necessary directories
|
||||||
|
@ -690,7 +690,12 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
|
|||||||
metrics.incrFileInfoOps();
|
metrics.incrFileInfoOps();
|
||||||
return namesystem.getFileInfo(src, true);
|
return namesystem.getFileInfo(src, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // ClientProtocol
|
||||||
|
public boolean isFileClosed(String src) throws IOException{
|
||||||
|
return namesystem.isFileClosed(src);
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
|
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
|
||||||
metrics.incrFileInfoOps();
|
metrics.incrFileInfoOps();
|
||||||
|
@ -332,6 +332,14 @@ message GetFileInfoResponseProto {
|
|||||||
optional HdfsFileStatusProto fs = 1;
|
optional HdfsFileStatusProto fs = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message IsFileClosedRequestProto {
|
||||||
|
required string src = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message IsFileClosedResponseProto {
|
||||||
|
required bool result = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message GetFileLinkInfoRequestProto {
|
message GetFileLinkInfoRequestProto {
|
||||||
required string src = 1;
|
required string src = 1;
|
||||||
}
|
}
|
||||||
@ -498,4 +506,6 @@ service ClientNamenodeProtocol {
|
|||||||
returns(SetBalancerBandwidthResponseProto);
|
returns(SetBalancerBandwidthResponseProto);
|
||||||
rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto)
|
rpc getDataEncryptionKey(GetDataEncryptionKeyRequestProto)
|
||||||
returns(GetDataEncryptionKeyResponseProto);
|
returns(GetDataEncryptionKeyResponseProto);
|
||||||
|
rpc isFileClosed(IsFileClosedRequestProto)
|
||||||
|
returns(IsFileClosedResponseProto);
|
||||||
}
|
}
|
||||||
|
@ -762,4 +762,27 @@ public void testCreateWithCustomChecksum() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testFileCloseStatus() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
try {
|
||||||
|
// create a new file.
|
||||||
|
Path file = new Path("/simpleFlush.dat");
|
||||||
|
FSDataOutputStream output = fs.create(file);
|
||||||
|
// write to file
|
||||||
|
output.writeBytes("Some test data");
|
||||||
|
output.flush();
|
||||||
|
assertFalse("File status should be open", fs.isFileClosed(file));
|
||||||
|
output.close();
|
||||||
|
assertTrue("File status should be closed", fs.isFileClosed(file));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user