diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 833b1eef6b..ad8689567e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -463,7 +463,7 @@ private void beginFileLease(final long inodeId, final DFSOutputStream out) } /** Stop renewal of lease for the file. */ - void endFileLease(final long inodeId) throws IOException { + void endFileLease(final long inodeId) { getLeaseRenewer().closeFile(inodeId, this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 93aee0e46f..a73ab95b72 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; @@ -732,6 +733,7 @@ protected synchronized void start() { * resources associated with this stream. */ void abort() throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); synchronized (this) { if (isClosed()) { return; @@ -740,9 +742,19 @@ void abort() throws IOException { new IOException("Lease timeout of " + (dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired.")); - closeThreads(true); + + try { + closeThreads(true); + } catch (IOException e) { + b.add(e); + } } + dfsClient.endFileLease(fileId); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } boolean isClosed() { @@ -775,13 +787,21 @@ protected void closeThreads(boolean force) throws IOException { */ @Override public void close() throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); synchronized (this) { try (TraceScope ignored = dfsClient.newPathTraceScope( "DFSOutputStream#close", src)) { closeImpl(); + } catch (IOException e) { + b.add(e); } } + dfsClient.endFileLease(fileId); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } protected synchronized void closeImpl() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 502e0a5626..d5d0dfbbf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -798,6 +798,7 @@ protected synchronized void start() { @Override void abort() throws IOException { + final MultipleIOException.Builder b = new MultipleIOException.Builder(); synchronized (this) { if (isClosed()) { return; @@ -808,9 +809,19 @@ void abort() throws IOException { + (dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired.")); } - closeThreads(true); + + try { + closeThreads(true); + } catch (IOException e) { + b.add(e); + } } + dfsClient.endFileLease(fileId); + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 50f9f36b23..69248d98fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -71,6 +71,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -96,7 +97,6 @@ import org.junit.Test; import org.mockito.InOrder; import org.mockito.internal.util.reflection.Whitebox; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1373,4 +1373,37 @@ public void testTotalDfsUsed() throws Exception { } } + @Test + public void testDFSCloseFilesBeingWritten() throws Exception { + Configuration conf = getTestConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + + // Create one file then delete it to trigger the FileNotFoundException + // when closing the file. + fileSys.create(new Path("/test/dfsclose/file-0")); + fileSys.delete(new Path("/test/dfsclose/file-0"), true); + + DFSClient dfsClient = fileSys.getClient(); + // Construct a new dfsClient to get the same LeaseRenewer instance, + // to avoid the original client being added to the leaseRenewer again. + DFSClient newDfsClient = + new DFSClient(cluster.getFileSystem(0).getUri(), conf); + LeaseRenewer leaseRenewer = newDfsClient.getLeaseRenewer(); + + dfsClient.closeAllFilesBeingWritten(false); + // Remove new dfsClient in leaseRenewer + leaseRenewer.closeClient(newDfsClient); + + // The list of clients corresponding to this renewer should be empty + assertEquals(true, leaseRenewer.isEmpty()); + assertEquals(true, dfsClient.isFilesBeingWrittenEmpty()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }