diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b9cbef07e1..f820e5f42c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -923,7 +923,7 @@ protected synchronized void closeImpl() throws IOException { * If recoverLeaseOnCloseException is true and an exception occurs when * closing a file, recover lease. */ - private void recoverLease(boolean recoverLeaseOnCloseException) { + protected void recoverLease(boolean recoverLeaseOnCloseException) { if (recoverLeaseOnCloseException) { try { dfsClient.endFileLease(fileId); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 7c3965647c..ce89a0fac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -73,6 +73,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; /** * This class supports writing files in striped layout and erasure coded format. @@ -1200,6 +1202,9 @@ void setClosed() { @Override protected synchronized void closeImpl() throws IOException { + boolean recoverLeaseOnCloseException = dfsClient.getConfiguration() + .getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, + RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT); try { if (isClosed()) { exceptionLastSeen.check(true); @@ -1272,6 +1277,9 @@ protected synchronized void closeImpl() throws IOException { } logCorruptBlocks(); } catch (ClosedChannelException ignored) { + } catch (IOException ioe) { + recoverLease(recoverLeaseOnCloseException); + throw ioe; } finally { setClosed(); // shutdown executor of flushAll tasks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 092aa0aaca..79112af12c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -17,16 +17,22 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.EnumSet; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -247,4 +253,75 @@ public void testFileBlockSizeSmallerThanCellSize() throws Exception { .assertExceptionContains("less than the cell size", expected); } } + + @Test + public void testExceptionInCloseECFileWithRecoverLease() throws Exception { + Configuration config = new Configuration(); + config.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true); + DFSClient client = + new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), config); + DFSClient spyClient = Mockito.spy(client); + DFSOutputStream dfsOutputStream = + spyClient.create("/testExceptionInCloseECFileWithRecoverLease", + FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), + (short) 3, 1024*1024, null, 1024, null); + assertTrue("stream should be a DFSStripedOutputStream", + dfsOutputStream instanceof DFSStripedOutputStream); + DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream); + doThrow(new IOException("Emulated IOException in close")) + .when(spyDFSOutputStream).completeFile(Mockito.any()); + try { + spyDFSOutputStream.close(); + fail(); + } catch (IOException ioe) { + assertTrue(spyDFSOutputStream.isLeaseRecovered()); + waitForFileClosed("/testExceptionInCloseECFileWithRecoverLease"); + assertTrue(isFileClosed("/testExceptionInCloseECFileWithRecoverLease")); + } + } + + @Test + public void testExceptionInCloseECFileWithoutRecoverLease() throws Exception { + Configuration config = new Configuration(); + DFSClient client = + new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), config); + DFSClient spyClient = Mockito.spy(client); + DFSOutputStream dfsOutputStream = + spyClient.create("/testExceptionInCloseECFileWithoutRecoverLease", + FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), + (short) 3, 1024*1024, null, 1024, null); + assertTrue("stream should be a DFSStripedOutputStream", + dfsOutputStream instanceof DFSStripedOutputStream); + DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream); + doThrow(new IOException("Emulated IOException in close")) + .when(spyDFSOutputStream).completeFile(Mockito.any()); + try { + spyDFSOutputStream.close(); + fail(); + } catch (IOException ioe) { + assertFalse(spyDFSOutputStream.isLeaseRecovered()); + try { + waitForFileClosed("/testExceptionInCloseECFileWithoutRecoverLease"); + } catch (TimeoutException e) { + assertFalse( + isFileClosed("/testExceptionInCloseECFileWithoutRecoverLease")); + } + } + } + + private boolean isFileClosed(String path) throws IOException { + return cluster.getFileSystem().isFileClosed(new Path(path)); + } + + private void waitForFileClosed(String path) throws Exception { + GenericTestUtils.waitFor(() -> { + boolean closed; + try { + closed = isFileClosed(path); + } catch (IOException e) { + return false; + } + return closed; + }, 1000, 5000); + } }