From fbd88f1062f3c4b208724d208e3f501eb196dfab Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Thu, 16 Jul 2015 12:33:57 -0700 Subject: [PATCH] HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write files rather than the entire DFSClient. (mingma) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSClient.java | 16 +---- .../hadoop/hdfs/client/impl/LeaseRenewer.java | 12 +++- .../hadoop/hdfs/TestDFSClientRetries.java | 66 ++++++++++++++++++- 4 files changed, 79 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8f6dd41246..c6685e1bfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -165,6 +165,9 @@ Trunk (Unreleased) HDFS-5033. Bad error message for fs -put/copyFromLocal if user doesn't have permissions to read the source (Darrell Taylor via aw) + HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write + files rather than the entire DFSClient. (mingma) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 6629a83b36..6f9e613605 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -567,23 +567,9 @@ public boolean renewLease() throws IOException { void closeConnectionToNamenode() { RPC.stopProxy(namenode); } - - /** Abort and release resources held. Ignore all errors. */ - public void abort() { - clientRunning = false; - closeAllFilesBeingWritten(true); - try { - // remove reference to this client and stop the renewer, - // if there is no more clients under the renewer. - getLeaseRenewer().closeClient(this); - } catch (IOException ioe) { - LOG.info("Exception occurred while aborting the client " + ioe); - } - closeConnectionToNamenode(); - } /** Close/abort all files being written. */ - private void closeAllFilesBeingWritten(final boolean abort) { + public void closeAllFilesBeingWritten(final boolean abort) { for(;;) { final long inodeId; final DFSOutputStream out; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java index 99323bb5ae..c689b732cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -215,6 +215,12 @@ private synchronized long getRenewalTime() { return renewal; } + /** Used for testing only. */ + @VisibleForTesting + public synchronized void setRenewalTime(final long renewal) { + this.renewal = renewal; + } + /** Add a client. */ private synchronized void addClient(final DFSClient dfsc) { for(DFSClient c : dfsclients) { @@ -453,8 +459,12 @@ private void run(final int id) throws InterruptedException { + (elapsed/1000) + " seconds. Aborting ...", ie); synchronized (this) { while (!dfsclients.isEmpty()) { - dfsclients.get(0).abort(); + DFSClient dfsClient = dfsclients.get(0); + dfsClient.closeAllFilesBeingWritten(true); + closeClient(dfsClient); } + //Expire the current LeaseRenewer thread. + emptyTime = 0; } break; } catch (IOException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 43e0eb921f..441ef9c501 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.when; import java.io.FileNotFoundException; @@ -62,6 +63,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.client.HdfsUtils; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -354,7 +356,59 @@ public void testFailuresArePerOperation() throws Exception cluster.shutdown(); } } - + + /** + * Test DFSClient can continue to function after renewLease RPC + * receives SocketTimeoutException. + */ + @Test + public void testLeaseRenewSocketTimeout() throws Exception + { + String file1 = "/testFile1"; + String file2 = "/testFile2"; + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); + conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc()); + Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease( + Mockito.anyString()); + DFSClient client = new DFSClient(null, spyNN, conf, null); + // Get hold of the lease renewer instance used by the client + LeaseRenewer leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out1 = client.create(file1, false); + + Mockito.verify(spyNN, timeout(10000).times(1)).renewLease( + Mockito.anyString()); + verifyEmptyLease(leaseRenewer); + try { + out1.write(new byte[256]); + fail("existing output stream should be aborted"); + } catch (IOException e) { + } + + // Verify DFSClient can do read operation after renewLease aborted. + client.exists(file2); + // Verify DFSClient can do write operation after renewLease no longer + // throws SocketTimeoutException. + Mockito.doNothing().when(spyNN).renewLease( + Mockito.anyString()); + leaseRenewer = client.getLeaseRenewer(); + leaseRenewer.setRenewalTime(100); + OutputStream out2 = client.create(file2, false); + Mockito.verify(spyNN, timeout(10000).times(2)).renewLease( + Mockito.anyString()); + out2.write(new byte[256]); + out2.close(); + verifyEmptyLease(leaseRenewer); + } finally { + cluster.shutdown(); + } + } + /** * Test that getAdditionalBlock() and close() are idempotent. This allows * a client to safely retry a call and still produce a correct @@ -673,7 +727,15 @@ private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, in } return ret; } - + + private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception { + int sleepCount = 0; + while (!leaseRenewer.isEmpty() && sleepCount++ < 20) { + Thread.sleep(500); + } + assertTrue("Lease should be empty.", leaseRenewer.isEmpty()); + } + class DFSClientReader implements Runnable { DFSClient client;