HDFS-10549. Correctly revoke file leases when closing files. Contributed by Yiqun Lin.
This commit is contained in:
parent
c57523163f
commit
2aa5e2c403
@ -463,7 +463,7 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out)
|
||||
}
|
||||
|
||||
/** Stop renewal of lease for the file. */
|
||||
void endFileLease(final long inodeId) throws IOException {
|
||||
void endFileLease(final long inodeId) {
|
||||
getLeaseRenewer().closeFile(inodeId, this);
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
@ -732,6 +733,7 @@ protected synchronized void start() {
|
||||
* resources associated with this stream.
|
||||
*/
|
||||
void abort() throws IOException {
|
||||
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||
synchronized (this) {
|
||||
if (isClosed()) {
|
||||
return;
|
||||
@ -740,9 +742,19 @@ void abort() throws IOException {
|
||||
new IOException("Lease timeout of "
|
||||
+ (dfsClient.getConf().getHdfsTimeout() / 1000)
|
||||
+ " seconds expired."));
|
||||
|
||||
try {
|
||||
closeThreads(true);
|
||||
} catch (IOException e) {
|
||||
b.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
dfsClient.endFileLease(fileId);
|
||||
final IOException ioe = b.build();
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
boolean isClosed() {
|
||||
@ -775,13 +787,21 @@ protected void closeThreads(boolean force) throws IOException {
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||
synchronized (this) {
|
||||
try (TraceScope ignored = dfsClient.newPathTraceScope(
|
||||
"DFSOutputStream#close", src)) {
|
||||
closeImpl();
|
||||
} catch (IOException e) {
|
||||
b.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
dfsClient.endFileLease(fileId);
|
||||
final IOException ioe = b.build();
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void closeImpl() throws IOException {
|
||||
|
@ -798,6 +798,7 @@ protected synchronized void start() {
|
||||
|
||||
@Override
|
||||
void abort() throws IOException {
|
||||
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||
synchronized (this) {
|
||||
if (isClosed()) {
|
||||
return;
|
||||
@ -808,9 +809,19 @@ void abort() throws IOException {
|
||||
+ (dfsClient.getConf().getHdfsTimeout() / 1000)
|
||||
+ " seconds expired."));
|
||||
}
|
||||
|
||||
try {
|
||||
closeThreads(true);
|
||||
} catch (IOException e) {
|
||||
b.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
dfsClient.endFileLease(fileId);
|
||||
final IOException ioe = b.build();
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,6 +71,7 @@
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -96,7 +97,6 @@
|
||||
import org.junit.Test;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -1373,4 +1373,37 @@ public void testTotalDfsUsed() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDFSCloseFilesBeingWritten() throws Exception {
|
||||
Configuration conf = getTestConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
DistributedFileSystem fileSys = cluster.getFileSystem();
|
||||
|
||||
// Create one file then delete it to trigger the FileNotFoundException
|
||||
// when closing the file.
|
||||
fileSys.create(new Path("/test/dfsclose/file-0"));
|
||||
fileSys.delete(new Path("/test/dfsclose/file-0"), true);
|
||||
|
||||
DFSClient dfsClient = fileSys.getClient();
|
||||
// Construct a new dfsClient to get the same LeaseRenewer instance,
|
||||
// to avoid the original client being added to the leaseRenewer again.
|
||||
DFSClient newDfsClient =
|
||||
new DFSClient(cluster.getFileSystem(0).getUri(), conf);
|
||||
LeaseRenewer leaseRenewer = newDfsClient.getLeaseRenewer();
|
||||
|
||||
dfsClient.closeAllFilesBeingWritten(false);
|
||||
// Remove new dfsClient in leaseRenewer
|
||||
leaseRenewer.closeClient(newDfsClient);
|
||||
|
||||
// The list of clients corresponding to this renewer should be empty
|
||||
assertEquals(true, leaseRenewer.isEmpty());
|
||||
assertEquals(true, dfsClient.isFilesBeingWrittenEmpty());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user