HDFS-2810. Leases not getting renewed properly by clients. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1233794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b7eb5334f5
commit
520a39ac2d
@ -338,6 +338,8 @@ Release 0.23.1 - UNRELEASED
|
||||
HDFS-2790. FSNamesystem.setTimes throws exception with wrong
|
||||
configuration name in the message. (Arpit Gupta via eli)
|
||||
|
||||
HDFS-2810. Leases not getting renewed properly by clients (todd)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -373,11 +373,17 @@ boolean isClientRunning() {
|
||||
return clientRunning;
|
||||
}
|
||||
|
||||
/** Renew leases */
|
||||
void renewLease() throws IOException {
|
||||
/**
|
||||
* Renew leases.
|
||||
* @return true if lease was renewed. May return false if this
|
||||
* client has been closed or has no files open.
|
||||
**/
|
||||
boolean renewLease() throws IOException {
|
||||
if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
||||
namenode.renewLease(clientName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,7 +67,7 @@
|
||||
* </p>
|
||||
*/
|
||||
class LeaseRenewer {
|
||||
private static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
|
||||
static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
|
||||
|
||||
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
|
||||
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
||||
@ -407,7 +407,13 @@ public int compare(final DFSClient left, final DFSClient right) {
|
||||
final DFSClient c = copies.get(i);
|
||||
//skip if current client name is the same as the previous name.
|
||||
if (!c.getClientName().equals(previousName)) {
|
||||
c.renewLease();
|
||||
if (!c.renewLease()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Did not renew lease for client " +
|
||||
c);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
previousName = c.getClientName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Lease renewed for client " + previousName);
|
||||
|
@ -17,11 +17,14 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -29,6 +32,8 @@
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
public class TestLeaseRenewer {
|
||||
private String FAKE_AUTHORITY="hdfs://nn1/";
|
||||
private UserGroupInformation FAKE_UGI_A =
|
||||
@ -46,19 +51,24 @@ public class TestLeaseRenewer {
|
||||
|
||||
@Before
|
||||
public void setupMocksAndRenewer() throws IOException {
|
||||
MOCK_DFSCLIENT = Mockito.mock(DFSClient.class);
|
||||
Mockito.doReturn(true)
|
||||
.when(MOCK_DFSCLIENT).isClientRunning();
|
||||
Mockito.doReturn((int)FAST_GRACE_PERIOD)
|
||||
.when(MOCK_DFSCLIENT).getHdfsTimeout();
|
||||
Mockito.doReturn("myclient")
|
||||
.when(MOCK_DFSCLIENT).getClientName();
|
||||
MOCK_DFSCLIENT = createMockClient();
|
||||
|
||||
renewer = LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
||||
}
|
||||
|
||||
private DFSClient createMockClient() {
|
||||
DFSClient mock = Mockito.mock(DFSClient.class);
|
||||
Mockito.doReturn(true)
|
||||
.when(mock).isClientRunning();
|
||||
Mockito.doReturn((int)FAST_GRACE_PERIOD)
|
||||
.when(mock).getHdfsTimeout();
|
||||
Mockito.doReturn("myclient")
|
||||
.when(mock).getClientName();
|
||||
return mock;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInstanceSharing() throws IOException {
|
||||
// Two lease renewers with the same UGI should return
|
||||
@ -93,11 +103,11 @@ public void testClientName() throws IOException {
|
||||
public void testRenewal() throws Exception {
|
||||
// Keep track of how many times the lease gets renewed
|
||||
final AtomicInteger leaseRenewalCount = new AtomicInteger();
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
Mockito.doAnswer(new Answer<Boolean>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
leaseRenewalCount.incrementAndGet();
|
||||
return null;
|
||||
return true;
|
||||
}
|
||||
}).when(MOCK_DFSCLIENT).renewLease();
|
||||
|
||||
@ -120,6 +130,57 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
renewer.closeFile(filePath, MOCK_DFSCLIENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
|
||||
* to several DFSClients with the same name, the first of which has no files
|
||||
* open. Previously, this was causing the lease to not get renewed.
|
||||
*/
|
||||
@Test
|
||||
public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
||||
// First DFSClient has no files open so doesn't renew leases.
|
||||
final DFSClient mockClient1 = createMockClient();
|
||||
Mockito.doReturn(false).when(mockClient1).renewLease();
|
||||
assertSame(renewer, LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
|
||||
|
||||
// Set up a file so that we start renewing our lease.
|
||||
DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
|
||||
String filePath = "/foo";
|
||||
renewer.put(filePath, mockStream1, mockClient1);
|
||||
|
||||
// Second DFSClient does renew lease
|
||||
final DFSClient mockClient2 = createMockClient();
|
||||
Mockito.doReturn(true).when(mockClient2).renewLease();
|
||||
assertSame(renewer, LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
|
||||
|
||||
// Set up a file so that we start renewing our lease.
|
||||
DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
|
||||
renewer.put(filePath, mockStream2, mockClient2);
|
||||
|
||||
|
||||
// Wait for lease to get renewed
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
Mockito.verify(mockClient1, Mockito.atLeastOnce()).renewLease();
|
||||
Mockito.verify(mockClient2, Mockito.atLeastOnce()).renewLease();
|
||||
return true;
|
||||
} catch (AssertionError err) {
|
||||
LeaseRenewer.LOG.warn("Not yet satisfied", err);
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
// should not throw!
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}, 100, 10000);
|
||||
|
||||
renewer.closeFile(filePath, mockClient1);
|
||||
renewer.closeFile(filePath, mockClient2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadName() throws Exception {
|
||||
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
||||
|
Loading…
Reference in New Issue
Block a user