HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. Contributed by Takanobu
This commit is contained in:
parent
64d30a6186
commit
d3d019c337
@ -494,6 +494,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html.
|
||||
(Charles Lamb via wang)
|
||||
|
||||
HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. (Takanobu
|
||||
Asanuma via szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -101,6 +101,7 @@
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
@ -481,7 +482,7 @@ void endFileLease(final long inodeId) throws IOException {
|
||||
* enforced to consistently update its local dfsclients array and
|
||||
* client's filesBeingWritten map.
|
||||
*/
|
||||
void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
|
||||
public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
|
||||
synchronized(filesBeingWritten) {
|
||||
filesBeingWritten.put(inodeId, out);
|
||||
// update the last lease renewal time only when there was no
|
||||
@ -494,7 +495,7 @@ void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
|
||||
}
|
||||
|
||||
/** Remove a file. Only called from LeaseRenewer. */
|
||||
void removeFileBeingWritten(final long inodeId) {
|
||||
public void removeFileBeingWritten(final long inodeId) {
|
||||
synchronized(filesBeingWritten) {
|
||||
filesBeingWritten.remove(inodeId);
|
||||
if (filesBeingWritten.isEmpty()) {
|
||||
@ -504,14 +505,14 @@ void removeFileBeingWritten(final long inodeId) {
|
||||
}
|
||||
|
||||
/** Is file-being-written map empty? */
|
||||
boolean isFilesBeingWrittenEmpty() {
|
||||
public boolean isFilesBeingWrittenEmpty() {
|
||||
synchronized(filesBeingWritten) {
|
||||
return filesBeingWritten.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/** @return true if the client is running */
|
||||
boolean isClientRunning() {
|
||||
public boolean isClientRunning() {
|
||||
return clientRunning;
|
||||
}
|
||||
|
||||
@ -533,7 +534,7 @@ void updateLastLeaseRenewal() {
|
||||
* @return true if lease was renewed. May return false if this
|
||||
* client has been closed or has no files open.
|
||||
**/
|
||||
boolean renewLease() throws IOException {
|
||||
public boolean renewLease() throws IOException {
|
||||
if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
||||
try {
|
||||
namenode.renewLease(clientName);
|
||||
@ -565,7 +566,7 @@ void closeConnectionToNamenode() {
|
||||
}
|
||||
|
||||
/** Abort and release resources held. Ignore all errors. */
|
||||
void abort() {
|
||||
public void abort() {
|
||||
clientRunning = false;
|
||||
closeAllFilesBeingWritten(true);
|
||||
try {
|
||||
|
@ -15,7 +15,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
@ -31,6 +31,8 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
@ -40,7 +42,7 @@
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Used by {@link DFSClient} for renewing file-being-written leases
|
||||
* Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
|
||||
* on the namenode.
|
||||
* When a file is opened for write (create or append),
|
||||
* namenode stores a file lease for recording the identity of the writer.
|
||||
@ -53,12 +55,12 @@
|
||||
* This class also provides the following functionality:
|
||||
* <ul>
|
||||
* <li>
|
||||
* It maintains a map from (namenode, user) pairs to lease renewers.
|
||||
* It maintains a map from (namenode, user) pairs to lease renewers.
|
||||
* The same {@link LeaseRenewer} instance is used for renewing lease
|
||||
* for all the {@link DFSClient} to the same namenode and the same user.
|
||||
* for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
|
||||
* </li>
|
||||
* <li>
|
||||
* Each renewer maintains a list of {@link DFSClient}.
|
||||
* Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
|
||||
* Periodically the leases for all the clients are renewed.
|
||||
* A client is removed from the list when the client is closed.
|
||||
* </li>
|
||||
@ -70,21 +72,21 @@
|
||||
* </p>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class LeaseRenewer {
|
||||
public class LeaseRenewer {
|
||||
static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
|
||||
|
||||
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
|
||||
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
||||
|
||||
/** Get a {@link LeaseRenewer} instance */
|
||||
static LeaseRenewer getInstance(final String authority,
|
||||
public static LeaseRenewer getInstance(final String authority,
|
||||
final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
|
||||
final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
|
||||
r.addClient(dfsc);
|
||||
return r;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* A factory for sharing {@link LeaseRenewer} objects
|
||||
* among {@link DFSClient} instances
|
||||
* so that there is only one renewer per authority per user.
|
||||
@ -124,7 +126,7 @@ public boolean equals(Object obj) {
|
||||
return this.authority.equals(that.authority)
|
||||
&& this.ugi.equals(that.ugi);
|
||||
}
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -170,17 +172,17 @@ private synchronized void remove(final LeaseRenewer r) {
|
||||
/** Only the daemon with currentId should run. */
|
||||
private int currentId = 0;
|
||||
|
||||
/**
|
||||
/**
|
||||
* A period in milliseconds that the lease renewer thread should run
|
||||
* after the map became empty.
|
||||
* In other words,
|
||||
* if the map is empty for a time period longer than the grace period,
|
||||
* the renewer should terminate.
|
||||
* the renewer should terminate.
|
||||
*/
|
||||
private long gracePeriod;
|
||||
/**
|
||||
* The time period in milliseconds
|
||||
* that the renewer sleeps for each iteration.
|
||||
* that the renewer sleeps for each iteration.
|
||||
*/
|
||||
private long sleepPeriod;
|
||||
|
||||
@ -199,7 +201,7 @@ private synchronized void remove(final LeaseRenewer r) {
|
||||
private LeaseRenewer(Factory.Key factorykey) {
|
||||
this.factorykey = factorykey;
|
||||
unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
|
||||
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
instantiationTrace = StringUtils.stringifyException(
|
||||
new Throwable("TRACE"));
|
||||
@ -244,7 +246,7 @@ private synchronized boolean clientsRunning() {
|
||||
}
|
||||
|
||||
private synchronized long getSleepPeriod() {
|
||||
return sleepPeriod;
|
||||
return sleepPeriod;
|
||||
}
|
||||
|
||||
/** Set the grace period and adjust the sleep period accordingly. */
|
||||
@ -272,19 +274,19 @@ synchronized boolean isRunning() {
|
||||
public boolean isEmpty() {
|
||||
return dfsclients.isEmpty();
|
||||
}
|
||||
|
||||
|
||||
/** Used only by tests */
|
||||
synchronized String getDaemonName() {
|
||||
return daemon.getName();
|
||||
}
|
||||
|
||||
/** Is the empty period longer than the grace period? */
|
||||
/** Is the empty period longer than the grace period? */
|
||||
private synchronized boolean isRenewerExpired() {
|
||||
return emptyTime != Long.MAX_VALUE
|
||||
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
||||
}
|
||||
|
||||
synchronized void put(final long inodeId, final DFSOutputStream out,
|
||||
public synchronized void put(final long inodeId, final DFSOutputStream out,
|
||||
final DFSClient dfsc) {
|
||||
if (dfsc.isClientRunning()) {
|
||||
if (!isRunning() || isRenewerExpired()) {
|
||||
@ -314,7 +316,7 @@ public void run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.valueOf(LeaseRenewer.this);
|
||||
@ -333,7 +335,7 @@ synchronized void setEmptyTime(long time) {
|
||||
}
|
||||
|
||||
/** Close a file. */
|
||||
void closeFile(final long inodeId, final DFSClient dfsc) {
|
||||
public void closeFile(final long inodeId, final DFSClient dfsc) {
|
||||
dfsc.removeFileBeingWritten(inodeId);
|
||||
|
||||
synchronized(this) {
|
||||
@ -355,7 +357,7 @@ void closeFile(final long inodeId, final DFSClient dfsc) {
|
||||
}
|
||||
|
||||
/** Close the given client. */
|
||||
synchronized void closeClient(final DFSClient dfsc) {
|
||||
public synchronized void closeClient(final DFSClient dfsc) {
|
||||
dfsclients.remove(dfsc);
|
||||
if (dfsclients.isEmpty()) {
|
||||
if (!isRunning() || isRenewerExpired()) {
|
||||
@ -381,7 +383,7 @@ synchronized void closeClient(final DFSClient dfsc) {
|
||||
}
|
||||
}
|
||||
|
||||
void interruptAndJoin() throws InterruptedException {
|
||||
public void interruptAndJoin() throws InterruptedException {
|
||||
Daemon daemonCopy = null;
|
||||
synchronized (this) {
|
||||
if (isRunning()) {
|
||||
@ -389,7 +391,7 @@ void interruptAndJoin() throws InterruptedException {
|
||||
daemonCopy = daemon;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (daemonCopy != null) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wait for lease checker to terminate");
|
@ -31,6 +31,7 @@
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.SocketTimeoutException;
|
||||
@ -64,6 +65,7 @@
|
||||
import org.apache.hadoop.fs.VolumeId;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
@ -264,78 +266,84 @@ public void testDFSClient() throws Exception {
|
||||
|
||||
{
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
|
||||
.getDeclaredMethod("setGraceSleepPeriod", long.class);
|
||||
setMethod.setAccessible(true);
|
||||
setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
|
||||
Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
|
||||
.getDeclaredMethod("isRunning");
|
||||
checkMethod.setAccessible(true);
|
||||
assertFalse((boolean) checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
|
||||
{
|
||||
//create a file
|
||||
final FSDataOutputStream out = dfs.create(filepaths[0]);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//write something
|
||||
out.writeLong(millis);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//close
|
||||
out.close();
|
||||
Thread.sleep(grace/4*3);
|
||||
//within grace period
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
for(int i = 0; i < 3; i++) {
|
||||
if (dfs.dfs.getLeaseRenewer().isRunning()) {
|
||||
if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
|
||||
Thread.sleep(grace/2);
|
||||
}
|
||||
}
|
||||
//passed grace period
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
}
|
||||
|
||||
{
|
||||
//create file1
|
||||
final FSDataOutputStream out1 = dfs.create(filepaths[1]);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//create file2
|
||||
final FSDataOutputStream out2 = dfs.create(filepaths[2]);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
|
||||
//write something to file1
|
||||
out1.writeLong(millis);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//close file1
|
||||
out1.close();
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
|
||||
//write something to file2
|
||||
out2.writeLong(millis);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//close file2
|
||||
out2.close();
|
||||
Thread.sleep(grace/4*3);
|
||||
//within grace period
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
}
|
||||
|
||||
{
|
||||
//create file3
|
||||
final FSDataOutputStream out3 = dfs.create(filepaths[3]);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
Thread.sleep(grace/4*3);
|
||||
//passed previous grace period, should still running
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//write something to file3
|
||||
out3.writeLong(millis);
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
//close file3
|
||||
out3.close();
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
Thread.sleep(grace/4*3);
|
||||
//within grace period
|
||||
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
for(int i = 0; i < 3; i++) {
|
||||
if (dfs.dfs.getLeaseRenewer().isRunning()) {
|
||||
if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
|
||||
Thread.sleep(grace/2);
|
||||
}
|
||||
}
|
||||
//passed grace period
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
}
|
||||
|
||||
dfs.close();
|
||||
@ -364,15 +372,18 @@ public void testDFSClient() throws Exception {
|
||||
|
||||
{
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
|
||||
.getDeclaredMethod("isRunning");
|
||||
checkMethod.setAccessible(true);
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
|
||||
//open and check the file
|
||||
FSDataInputStream in = dfs.open(filepaths[0]);
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
assertEquals(millis, in.readLong());
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
in.close();
|
||||
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
|
||||
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
|
||||
dfs.close();
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
|
@ -15,13 +15,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
package org.apache.hadoop.hdfs.client.impl;
|
||||
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -46,19 +48,19 @@ public class TestLeaseRenewer {
|
||||
|
||||
private DFSClient MOCK_DFSCLIENT;
|
||||
private LeaseRenewer renewer;
|
||||
|
||||
|
||||
/** Cause renewals often so test runs quickly. */
|
||||
private static final long FAST_GRACE_PERIOD = 100L;
|
||||
|
||||
|
||||
@Before
|
||||
public void setupMocksAndRenewer() throws IOException {
|
||||
MOCK_DFSCLIENT = createMockClient();
|
||||
|
||||
|
||||
renewer = LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||
renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
|
||||
}
|
||||
|
||||
|
||||
private DFSClient createMockClient() {
|
||||
final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
|
||||
Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
|
||||
@ -79,12 +81,12 @@ public void testInstanceSharing() throws IOException {
|
||||
LeaseRenewer lr2 = LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
|
||||
Assert.assertSame(lr, lr2);
|
||||
|
||||
|
||||
// But a different UGI should return a different instance
|
||||
LeaseRenewer lr3 = LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_B, MOCK_DFSCLIENT);
|
||||
Assert.assertNotSame(lr, lr3);
|
||||
|
||||
|
||||
// A different authority with same UGI should also be a different
|
||||
// instance.
|
||||
LeaseRenewer lr4 = LeaseRenewer.getInstance(
|
||||
@ -92,7 +94,7 @@ public void testInstanceSharing() throws IOException {
|
||||
Assert.assertNotSame(lr, lr4);
|
||||
Assert.assertNotSame(lr3, lr4);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRenewal() throws Exception {
|
||||
// Keep track of how many times the lease gets renewed
|
||||
@ -105,7 +107,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
}
|
||||
}).when(MOCK_DFSCLIENT).renewLease();
|
||||
|
||||
|
||||
|
||||
// Set up a file so that we start renewing our lease.
|
||||
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
||||
long fileId = 123L;
|
||||
@ -123,7 +125,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
|
||||
renewer.closeFile(fileId, MOCK_DFSCLIENT);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Regression test for HDFS-2810. In this bug, the LeaseRenewer has handles
|
||||
* to several DFSClients with the same name, the first of which has no files
|
||||
@ -136,7 +138,7 @@ public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
||||
Mockito.doReturn(false).when(mockClient1).renewLease();
|
||||
assertSame(renewer, LeaseRenewer.getInstance(
|
||||
FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
|
||||
|
||||
|
||||
// Set up a file so that we start renewing our lease.
|
||||
DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
|
||||
long fileId = 456L;
|
||||
@ -152,7 +154,7 @@ public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
||||
DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
|
||||
renewer.put(fileId, mockStream2, mockClient2);
|
||||
|
||||
|
||||
|
||||
// Wait for lease to get renewed
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
@ -174,28 +176,28 @@ public Boolean get() {
|
||||
renewer.closeFile(fileId, mockClient1);
|
||||
renewer.closeFile(fileId, mockClient2);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testThreadName() throws Exception {
|
||||
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
||||
long fileId = 789L;
|
||||
Assert.assertFalse("Renewer not initially running",
|
||||
renewer.isRunning());
|
||||
|
||||
|
||||
// Pretend to open a file
|
||||
renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
|
||||
|
||||
|
||||
Assert.assertTrue("Renewer should have started running",
|
||||
renewer.isRunning());
|
||||
|
||||
|
||||
// Check the thread name is reasonable
|
||||
String threadName = renewer.getDaemonName();
|
||||
Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
|
||||
|
||||
|
||||
// Pretend to close the file
|
||||
renewer.closeFile(fileId, MOCK_DFSCLIENT);
|
||||
renewer.setEmptyTime(Time.monotonicNow());
|
||||
|
||||
|
||||
// Should stop the renewer running within a few seconds
|
||||
long failTime = Time.monotonicNow() + 5000;
|
||||
while (renewer.isRunning() && Time.monotonicNow() < failTime) {
|
||||
@ -203,5 +205,5 @@ public void testThreadName() throws Exception {
|
||||
}
|
||||
Assert.assertFalse(renewer.isRunning());
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user