diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a9e44cd5c0..42c1c85732 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -75,6 +75,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; /**************************************************************** * DFSOutputStream creates files from a stream of bytes. @@ -126,6 +128,7 @@ public class DFSOutputStream extends FSOutputSummer protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; private int writePacketSize; + private boolean leaseRecovered = false; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -861,7 +864,14 @@ public void close() throws IOException { } protected synchronized void closeImpl() throws IOException { + boolean recoverLeaseOnCloseException = dfsClient.getConfiguration() + .getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, + RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT); if (isClosed()) { + if (!leaseRecovered) { + recoverLease(recoverLeaseOnCloseException); + } + LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", closed, getStreamer().streamerClosed()); try { @@ -896,6 +906,9 @@ protected synchronized void closeImpl() throws IOException { } completeFile(); } catch (ClosedChannelException ignored) { + } catch (IOException ioe) { + recoverLease(recoverLeaseOnCloseException); + throw ioe; } finally { // Failures may happen when flushing data. // Streamers may keep waiting for the new block information. @@ -906,7 +919,23 @@ protected synchronized void closeImpl() throws IOException { } } - private void completeFile() throws IOException { + /** + * If recoverLeaseOnCloseException is true and an exception occurs when + * closing a file, recover lease. + */ + private void recoverLease(boolean recoverLeaseOnCloseException) { + if (recoverLeaseOnCloseException) { + try { + dfsClient.endFileLease(fileId); + dfsClient.recoverLease(src); + leaseRecovered = true; + } catch (Exception e) { + LOG.warn("Fail to recover lease for {}", src, e); + } + } + } + + void completeFile() throws IOException { // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); try (TraceScope ignored = @@ -1076,6 +1105,11 @@ public String toString() { return getClass().getSimpleName() + ":" + streamer; } + @VisibleForTesting + boolean isLeaseRecovered() { + return leaseRecovered; + } + static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes, EnumSet allocFlags) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e8b540286c..8561caba2a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -362,6 +362,9 @@ interface Write { String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = PREFIX + "exclude.nodes.cache.expiry.interval.millis"; long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE; + String RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY = + PREFIX + "recover.lease.on.close.exception"; + boolean RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT = false; interface ByteArrayManager { String PREFIX = Write.PREFIX + "byte-array-manager."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 1891956dc0..e263a8e273 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Random; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; @@ -62,7 +63,10 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import org.mockito.Mockito; @@ -371,10 +375,75 @@ public void testStreamFlush() throws Exception { os.close(); } + @Test + public void testExceptionInCloseWithRecoverLease() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true); + DFSClient client = + new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf); + DFSClient spyClient = Mockito.spy(client); + DFSOutputStream dfsOutputStream = spyClient.create( + "/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(), + EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null); + DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream); + doThrow(new IOException("Emulated IOException in close")) + .when(spyDFSOutputStream).completeFile(); + try { + spyDFSOutputStream.close(); + fail(); + } catch (IOException ioe) { + assertTrue(spyDFSOutputStream.isLeaseRecovered()); + waitForFileClosed("/testExceptionInCloseWithRecoverLease"); + assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease")); + } + } + + @Test + public void testExceptionInCloseWithoutRecoverLease() throws Exception { + Configuration conf = new Configuration(); + DFSClient client = + new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf); + DFSClient spyClient = Mockito.spy(client); + DFSOutputStream dfsOutputStream = + spyClient.create("/testExceptionInCloseWithoutRecoverLease", + FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), + (short) 3, 1024, null, 1024, null); + DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream); + doThrow(new IOException("Emulated IOException in close")) + .when(spyDFSOutputStream).completeFile(); + try { + spyDFSOutputStream.close(); + fail(); + } catch (IOException ioe) { + assertFalse(spyDFSOutputStream.isLeaseRecovered()); + try { + waitForFileClosed("/testExceptionInCloseWithoutRecoverLease"); + } catch (TimeoutException e) { + assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease")); + } + } + } + @AfterClass public static void tearDown() { if (cluster != null) { cluster.shutdown(); } } + + private boolean isFileClosed(String path) throws IOException { + return cluster.getFileSystem().isFileClosed(new Path(path)); + } + + private void waitForFileClosed(String path) throws Exception { + GenericTestUtils.waitFor(() -> { + boolean closed; + try { + closed = isFileClosed(path); + } catch (IOException e) { + return false; + } + return closed; + }, 1000, 5000); + } }