HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write files rather than the entire DFSClient. (mingma)

This commit is contained in:
Ming Ma 2015-07-16 12:33:57 -07:00
parent 1ba2986dee
commit fbd88f1062
4 changed files with 79 additions and 18 deletions

View File

@ -165,6 +165,9 @@ Trunk (Unreleased)
HDFS-5033. Bad error message for fs -put/copyFromLocal if user
doesn't have permissions to read the source (Darrell Taylor via aw)
HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write
files rather than the entire DFSClient. (mingma)
OPTIMIZATIONS
BUG FIXES

View File

@ -568,22 +568,8 @@ void closeConnectionToNamenode() {
RPC.stopProxy(namenode);
}
/** Abort and release resources held. Ignore all errors. */
public void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
getLeaseRenewer().closeClient(this);
} catch (IOException ioe) {
LOG.info("Exception occurred while aborting the client " + ioe);
}
closeConnectionToNamenode();
}
/** Close/abort all files being written. */
private void closeAllFilesBeingWritten(final boolean abort) {
public void closeAllFilesBeingWritten(final boolean abort) {
for(;;) {
final long inodeId;
final DFSOutputStream out;

View File

@ -215,6 +215,12 @@ private synchronized long getRenewalTime() {
return renewal;
}
/** Used for testing only. */
@VisibleForTesting
public synchronized void setRenewalTime(final long renewal) {
this.renewal = renewal;
}
/** Add a client. */
private synchronized void addClient(final DFSClient dfsc) {
for(DFSClient c : dfsclients) {
@ -453,8 +459,12 @@ private void run(final int id) throws InterruptedException {
+ (elapsed/1000) + " seconds. Aborting ...", ie);
synchronized (this) {
while (!dfsclients.isEmpty()) {
dfsclients.get(0).abort();
DFSClient dfsClient = dfsclients.get(0);
dfsClient.closeAllFilesBeingWritten(true);
closeClient(dfsClient);
}
//Expire the current LeaseRenewer thread.
emptyTime = 0;
}
break;
} catch (IOException ie) {

View File

@ -31,6 +31,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
@ -62,6 +63,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.client.HdfsUtils;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -355,6 +357,58 @@ public void testFailuresArePerOperation() throws Exception
}
}
/**
* Test DFSClient can continue to function after renewLease RPC
* receives SocketTimeoutException.
*/
@Test
public void testLeaseRenewSocketTimeout() throws Exception
{
String file1 = "/testFile1";
String file2 = "/testFile2";
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
Mockito.anyString());
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Get hold of the lease renewer instance used by the client
LeaseRenewer leaseRenewer = client.getLeaseRenewer();
leaseRenewer.setRenewalTime(100);
OutputStream out1 = client.create(file1, false);
Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
Mockito.anyString());
verifyEmptyLease(leaseRenewer);
try {
out1.write(new byte[256]);
fail("existing output stream should be aborted");
} catch (IOException e) {
}
// Verify DFSClient can do read operation after renewLease aborted.
client.exists(file2);
// Verify DFSClient can do write operation after renewLease no longer
// throws SocketTimeoutException.
Mockito.doNothing().when(spyNN).renewLease(
Mockito.anyString());
leaseRenewer = client.getLeaseRenewer();
leaseRenewer.setRenewalTime(100);
OutputStream out2 = client.create(file2, false);
Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
Mockito.anyString());
out2.write(new byte[256]);
out2.close();
verifyEmptyLease(leaseRenewer);
} finally {
cluster.shutdown();
}
}
/**
* Test that getAdditionalBlock() and close() are idempotent. This allows
* a client to safely retry a call and still produce a correct
@ -674,6 +728,14 @@ private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, in
return ret;
}
private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception {
int sleepCount = 0;
while (!leaseRenewer.isEmpty() && sleepCount++ < 20) {
Thread.sleep(500);
}
assertTrue("Lease should be empty.", leaseRenewer.isEmpty());
}
class DFSClientReader implements Runnable {
DFSClient client;