HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the lease soft-limit. Contributed by Kihwal Lee
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b9fd9e1759
commit
0ee71adeb4
@ -656,6 +656,9 @@ Release 0.23.2 - UNRELEASED
|
||||
HDFS-3012. Exception while renewing delegation token. (Bobby Evans via
|
||||
jitendra)
|
||||
|
||||
HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the
|
||||
lease soft-limit. (Kihwal Lee via szetszwo)
|
||||
|
||||
Release 0.23.1 - 2012-02-17
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -17,8 +17,38 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
@ -26,11 +56,8 @@
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
@ -60,8 +87,6 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
@ -101,7 +126,6 @@
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
@ -133,6 +157,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
|
||||
final UserGroupInformation ugi;
|
||||
volatile boolean clientRunning = true;
|
||||
volatile long lastLeaseRenewal;
|
||||
private volatile FsServerDefaults serverDefaults;
|
||||
private volatile long serverDefaultsLastUpdate;
|
||||
final String clientName;
|
||||
@ -381,6 +406,12 @@ void checkOpen() throws IOException {
|
||||
void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
||||
synchronized(filesBeingWritten) {
|
||||
filesBeingWritten.put(src, out);
|
||||
// update the last lease renewal time only when there was no
|
||||
// writes. once there is one write stream open, the lease renewer
|
||||
// thread keeps it updated well with in anyone's expiration time.
|
||||
if (lastLeaseRenewal == 0) {
|
||||
updateLastLeaseRenewal();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -388,6 +419,9 @@ void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
||||
void removeFileBeingWritten(final String src) {
|
||||
synchronized(filesBeingWritten) {
|
||||
filesBeingWritten.remove(src);
|
||||
if (filesBeingWritten.isEmpty()) {
|
||||
lastLeaseRenewal = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,6 +437,19 @@ boolean isClientRunning() {
|
||||
return clientRunning;
|
||||
}
|
||||
|
||||
long getLastLeaseRenewal() {
|
||||
return lastLeaseRenewal;
|
||||
}
|
||||
|
||||
void updateLastLeaseRenewal() {
|
||||
synchronized(filesBeingWritten) {
|
||||
if (filesBeingWritten.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
lastLeaseRenewal = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew leases.
|
||||
* @return true if lease was renewed. May return false if this
|
||||
@ -410,8 +457,24 @@ boolean isClientRunning() {
|
||||
**/
|
||||
boolean renewLease() throws IOException {
|
||||
if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
||||
try {
|
||||
namenode.renewLease(clientName);
|
||||
updateLastLeaseRenewal();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
// Abort if the lease has already expired.
|
||||
final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal();
|
||||
if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) {
|
||||
LOG.warn("Failed to renew lease for " + clientName + " for "
|
||||
+ (elapsed/1000) + " seconds (>= soft-limit ="
|
||||
+ (HdfsConstants.LEASE_SOFTLIMIT_PERIOD/1000) + " seconds.) "
|
||||
+ "Closing all files being written ...", e);
|
||||
closeAllFilesBeingWritten(true);
|
||||
} else {
|
||||
// Let the lease renewer handle it and retry.
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -430,7 +430,8 @@ private void run(final int id) throws InterruptedException {
|
||||
for(long lastRenewed = System.currentTimeMillis();
|
||||
clientsRunning() && !Thread.interrupted();
|
||||
Thread.sleep(getSleepPeriod())) {
|
||||
if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
|
||||
final long elapsed = System.currentTimeMillis() - lastRenewed;
|
||||
if (elapsed >= getRenewalTime()) {
|
||||
try {
|
||||
renew();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -440,7 +441,7 @@ private void run(final int id) throws InterruptedException {
|
||||
lastRenewed = System.currentTimeMillis();
|
||||
} catch (SocketTimeoutException ie) {
|
||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||
+ (getRenewalTime()/1000) + " seconds. Aborting ...", ie);
|
||||
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
||||
synchronized (this) {
|
||||
for(DFSClient c : dfsclients) {
|
||||
c.abort();
|
||||
@ -449,8 +450,7 @@ private void run(final int id) throws InterruptedException {
|
||||
break;
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||
+ (getRenewalTime()/1000) + " seconds. Will retry shortly ...",
|
||||
ie);
|
||||
+ (elapsed/1000) + " seconds. Will retry shortly ...", ie);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,17 +18,32 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
|
||||
public class TestLease {
|
||||
static boolean hasLease(MiniDFSCluster cluster, Path src) {
|
||||
@ -36,11 +51,80 @@ static boolean hasLease(MiniDFSCluster cluster, Path src) {
|
||||
).getLeaseByPath(src.toString()) != null;
|
||||
}
|
||||
|
||||
final Path dir = new Path("/test/lease/");
|
||||
static final String dirString = "/test/lease";
|
||||
final Path dir = new Path(dirString);
|
||||
static final Log LOG = LogFactory.getLog(TestLease.class);
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
||||
@Test
|
||||
public void testLeaseAbort() throws Exception {
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
|
||||
NamenodeProtocols spyNN = spy(preSpyNN);
|
||||
|
||||
DFSClient dfs = new DFSClient(null, spyNN, conf, null);
|
||||
byte[] buf = new byte[1024];
|
||||
|
||||
FSDataOutputStream c_out = createFsOut(dfs, dirString + "c");
|
||||
c_out.write(buf, 0, 1024);
|
||||
c_out.close();
|
||||
|
||||
DFSInputStream c_in = dfs.open(dirString + "c");
|
||||
FSDataOutputStream d_out = createFsOut(dfs, dirString + "d");
|
||||
|
||||
// stub the renew method.
|
||||
doThrow(new RemoteException(InvalidToken.class.getName(),
|
||||
"Your token is worthless")).when(spyNN).renewLease(anyString());
|
||||
|
||||
// We don't need to wait the lease renewer thread to act.
|
||||
// call renewLease() manually.
|
||||
// make it look like lease has already expired.
|
||||
dfs.lastLeaseRenewal = System.currentTimeMillis() - 300000;
|
||||
dfs.renewLease();
|
||||
|
||||
// this should not work.
|
||||
try {
|
||||
d_out.write(buf, 0, 1024);
|
||||
d_out.close();
|
||||
Assert.fail("Write did not fail even after the fatal lease renewal failure");
|
||||
} catch (IOException e) {
|
||||
LOG.info("Write failed as expected. ", e);
|
||||
}
|
||||
|
||||
// unstub
|
||||
doNothing().when(spyNN).renewLease(anyString());
|
||||
|
||||
// existing input streams should work
|
||||
try {
|
||||
int num = c_in.read(buf, 0, 1);
|
||||
if (num != 1) {
|
||||
Assert.fail("Failed to read 1 byte");
|
||||
}
|
||||
c_in.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Read failed with ", e);
|
||||
Assert.fail("Read after lease renewal failure failed");
|
||||
}
|
||||
|
||||
// new file writes should work.
|
||||
try {
|
||||
c_out = createFsOut(dfs, dirString + "c");
|
||||
c_out.write(buf, 0, 1024);
|
||||
c_out.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Write failed with ", e);
|
||||
Assert.fail("Write failed");
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLease() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
try {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
@ -94,6 +178,11 @@ public void testFactory() throws Exception {
|
||||
Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
|
||||
}
|
||||
|
||||
private FSDataOutputStream createFsOut(DFSClient dfs, String path)
|
||||
throws IOException {
|
||||
return new FSDataOutputStream(dfs.create(path, true), null);
|
||||
}
|
||||
|
||||
static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
|
||||
static public DFSClient createDFSClientAs(UserGroupInformation ugi,
|
||||
final Configuration conf) throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user