diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0a26b018b8..f07c5f8e8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -656,6 +656,9 @@ Release 0.23.2 - UNRELEASED HDFS-3012. Exception while renewing delegation token. (Bobby Evans via jitendra) + HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the + lease soft-limit. (Kihwal Lee via szetszwo) + Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 88b36b73b9..110a50b269 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -17,8 +17,38 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; + import java.io.BufferedOutputStream; -import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -26,11 +56,8 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.NetworkInterface; import java.net.Socket; -import java.net.SocketException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -60,8 +87,6 @@ import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -101,7 +126,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; @@ -133,6 +157,7 @@ public class DFSClient implements java.io.Closeable { final UserGroupInformation ugi; volatile boolean clientRunning = true; + volatile long lastLeaseRenewal; private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; @@ -381,6 +406,12 @@ public class DFSClient implements java.io.Closeable { void putFileBeingWritten(final String src, final DFSOutputStream out) { synchronized(filesBeingWritten) { filesBeingWritten.put(src, out); + // update the last lease renewal time only when there was no + // writes. once there is one write stream open, the lease renewer + // thread keeps it updated well with in anyone's expiration time. + if (lastLeaseRenewal == 0) { + updateLastLeaseRenewal(); + } } } @@ -388,6 +419,9 @@ public class DFSClient implements java.io.Closeable { void removeFileBeingWritten(final String src) { synchronized(filesBeingWritten) { filesBeingWritten.remove(src); + if (filesBeingWritten.isEmpty()) { + lastLeaseRenewal = 0; + } } } @@ -403,6 +437,19 @@ public class DFSClient implements java.io.Closeable { return clientRunning; } + long getLastLeaseRenewal() { + return lastLeaseRenewal; + } + + void updateLastLeaseRenewal() { + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + lastLeaseRenewal = System.currentTimeMillis(); + } + } + /** * Renew leases. * @return true if lease was renewed. May return false if this @@ -410,8 +457,24 @@ public class DFSClient implements java.io.Closeable { **/ boolean renewLease() throws IOException { if (clientRunning && !isFilesBeingWrittenEmpty()) { - namenode.renewLease(clientName); - return true; + try { + namenode.renewLease(clientName); + updateLastLeaseRenewal(); + return true; + } catch (IOException e) { + // Abort if the lease has already expired. + final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal(); + if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) { + LOG.warn("Failed to renew lease for " + clientName + " for " + + (elapsed/1000) + " seconds (>= soft-limit =" + + (HdfsConstants.LEASE_SOFTLIMIT_PERIOD/1000) + " seconds.) " + + "Closing all files being written ...", e); + closeAllFilesBeingWritten(true); + } else { + // Let the lease renewer handle it and retry. + throw e; + } + } } return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java index 862be0c184..471f7342e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java @@ -430,7 +430,8 @@ class LeaseRenewer { for(long lastRenewed = System.currentTimeMillis(); clientsRunning() && !Thread.interrupted(); Thread.sleep(getSleepPeriod())) { - if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) { + final long elapsed = System.currentTimeMillis() - lastRenewed; + if (elapsed >= getRenewalTime()) { try { renew(); if (LOG.isDebugEnabled()) { @@ -440,7 +441,7 @@ class LeaseRenewer { lastRenewed = System.currentTimeMillis(); } catch (SocketTimeoutException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (getRenewalTime()/1000) + " seconds. Aborting ...", ie); + + (elapsed/1000) + " seconds. Aborting ...", ie); synchronized (this) { for(DFSClient c : dfsclients) { c.abort(); @@ -449,8 +450,7 @@ class LeaseRenewer { break; } catch (IOException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " - + (getRenewalTime()/1000) + " seconds. Will retry shortly ...", - ie); + + (elapsed/1000) + " seconds. Will retry shortly ...", ie); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index a90c9d2ef3..aa41ef0563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -18,29 +18,113 @@ package org.apache.hadoop.hdfs; import java.io.DataOutputStream; +import java.io.IOException; import java.security.PrivilegedExceptionAction; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.doNothing; public class TestLease { static boolean hasLease(MiniDFSCluster cluster, Path src) { return NameNodeAdapter.getLeaseManager(cluster.getNamesystem() ).getLeaseByPath(src.toString()) != null; } - - final Path dir = new Path("/test/lease/"); + + static final String dirString = "/test/lease"; + final Path dir = new Path(dirString); + static final Log LOG = LogFactory.getLog(TestLease.class); + Configuration conf = new HdfsConfiguration(); + + @Test + public void testLeaseAbort() throws Exception { + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + try { + cluster.waitActive(); + NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); + NamenodeProtocols spyNN = spy(preSpyNN); + + DFSClient dfs = new DFSClient(null, spyNN, conf, null); + byte[] buf = new byte[1024]; + + FSDataOutputStream c_out = createFsOut(dfs, dirString + "c"); + c_out.write(buf, 0, 1024); + c_out.close(); + + DFSInputStream c_in = dfs.open(dirString + "c"); + FSDataOutputStream d_out = createFsOut(dfs, dirString + "d"); + + // stub the renew method. + doThrow(new RemoteException(InvalidToken.class.getName(), + "Your token is worthless")).when(spyNN).renewLease(anyString()); + + // We don't need to wait the lease renewer thread to act. + // call renewLease() manually. + // make it look like lease has already expired. + dfs.lastLeaseRenewal = System.currentTimeMillis() - 300000; + dfs.renewLease(); + + // this should not work. + try { + d_out.write(buf, 0, 1024); + d_out.close(); + Assert.fail("Write did not fail even after the fatal lease renewal failure"); + } catch (IOException e) { + LOG.info("Write failed as expected. ", e); + } + + // unstub + doNothing().when(spyNN).renewLease(anyString()); + + // existing input streams should work + try { + int num = c_in.read(buf, 0, 1); + if (num != 1) { + Assert.fail("Failed to read 1 byte"); + } + c_in.close(); + } catch (IOException e) { + LOG.error("Read failed with ", e); + Assert.fail("Read after lease renewal failure failed"); + } + + // new file writes should work. + try { + c_out = createFsOut(dfs, dirString + "c"); + c_out.write(buf, 0, 1024); + c_out.close(); + } catch (IOException e) { + LOG.error("Write failed with ", e); + Assert.fail("Write failed"); + } + } finally { + cluster.shutdown(); + } + } @Test public void testLease() throws Exception { - Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); try { FileSystem fs = cluster.getFileSystem(); @@ -94,6 +178,11 @@ public class TestLease { Assert.assertTrue(c3.leaserenewer != c5.leaserenewer); } + private FSDataOutputStream createFsOut(DFSClient dfs, String path) + throws IOException { + return new FSDataOutputStream(dfs.create(path, true), null); + } + static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class); static public DFSClient createDFSClientAs(UserGroupInformation ugi, final Configuration conf) throws Exception {