HDFS-15704. Mitigate lease monitor's rapid infinite loop. (#2511). Contributed by Daryn Sharp and Ahmed Hussein
(cherry picked from commit c2672bb234
)
This commit is contained in:
parent
3a860e876e
commit
be508718d8
@ -23,15 +23,13 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@ -92,21 +90,11 @@ public class LeaseManager {
|
|||||||
private long lastHolderUpdateTime;
|
private long lastHolderUpdateTime;
|
||||||
private String internalLeaseHolder;
|
private String internalLeaseHolder;
|
||||||
|
|
||||||
|
//
|
||||||
// Used for handling lock-leases
|
// Used for handling lock-leases
|
||||||
// Mapping: leaseHolder -> Lease
|
// Mapping: leaseHolder -> Lease
|
||||||
private final SortedMap<String, Lease> leases = new TreeMap<>();
|
//
|
||||||
// Set of: Lease
|
private final HashMap<String, Lease> leases = new HashMap<>();
|
||||||
private final NavigableSet<Lease> sortedLeases = new TreeSet<>(
|
|
||||||
new Comparator<Lease>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Lease o1, Lease o2) {
|
|
||||||
if (o1.getLastUpdate() != o2.getLastUpdate()) {
|
|
||||||
return Long.signum(o1.getLastUpdate() - o2.getLastUpdate());
|
|
||||||
} else {
|
|
||||||
return o1.holder.compareTo(o2.holder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// INodeID -> Lease
|
// INodeID -> Lease
|
||||||
private final TreeMap<Long, Lease> leasesById = new TreeMap<>();
|
private final TreeMap<Long, Lease> leasesById = new TreeMap<>();
|
||||||
|
|
||||||
@ -344,7 +332,7 @@ public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
|
|||||||
/** @return the number of leases currently in the system */
|
/** @return the number of leases currently in the system */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized int countLease() {
|
public synchronized int countLease() {
|
||||||
return sortedLeases.size();
|
return leases.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the number of paths contained in all leases */
|
/** @return the number of paths contained in all leases */
|
||||||
@ -360,7 +348,6 @@ synchronized Lease addLease(String holder, long inodeId) {
|
|||||||
if (lease == null) {
|
if (lease == null) {
|
||||||
lease = new Lease(holder);
|
lease = new Lease(holder);
|
||||||
leases.put(holder, lease);
|
leases.put(holder, lease);
|
||||||
sortedLeases.add(lease);
|
|
||||||
} else {
|
} else {
|
||||||
renewLease(lease);
|
renewLease(lease);
|
||||||
}
|
}
|
||||||
@ -386,9 +373,8 @@ private synchronized void removeLease(Lease lease, long inodeId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!lease.hasFiles()) {
|
if (!lease.hasFiles()) {
|
||||||
leases.remove(lease.holder);
|
if (leases.remove(lease.holder) == null) {
|
||||||
if (!sortedLeases.remove(lease)) {
|
LOG.error("{} not found", lease);
|
||||||
LOG.error("{} not found in sortedLeases", lease);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -407,7 +393,6 @@ synchronized void removeLease(String holder, INodeFile src) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
synchronized void removeAllLeases() {
|
synchronized void removeAllLeases() {
|
||||||
sortedLeases.clear();
|
|
||||||
leasesById.clear();
|
leasesById.clear();
|
||||||
leases.clear();
|
leases.clear();
|
||||||
}
|
}
|
||||||
@ -430,11 +415,10 @@ synchronized Lease reassignLease(Lease lease, INodeFile src,
|
|||||||
synchronized void renewLease(String holder) {
|
synchronized void renewLease(String holder) {
|
||||||
renewLease(getLease(holder));
|
renewLease(getLease(holder));
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void renewLease(Lease lease) {
|
synchronized void renewLease(Lease lease) {
|
||||||
if (lease != null) {
|
if (lease != null) {
|
||||||
sortedLeases.remove(lease);
|
|
||||||
lease.renew();
|
lease.renew();
|
||||||
sortedLeases.add(lease);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -460,8 +444,8 @@ class Lease {
|
|||||||
private final HashSet<Long> files = new HashSet<>();
|
private final HashSet<Long> files = new HashSet<>();
|
||||||
|
|
||||||
/** Only LeaseManager object can create a lease */
|
/** Only LeaseManager object can create a lease */
|
||||||
private Lease(String holder) {
|
private Lease(String h) {
|
||||||
this.holder = holder;
|
this.holder = h;
|
||||||
renew();
|
renew();
|
||||||
}
|
}
|
||||||
/** Only LeaseManager object can renew a lease */
|
/** Only LeaseManager object can renew a lease */
|
||||||
@ -474,6 +458,10 @@ public boolean expiredHardLimit() {
|
|||||||
return monotonicNow() - lastUpdate > hardLimit;
|
return monotonicNow() - lastUpdate > hardLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean expiredHardLimit(long now) {
|
||||||
|
return now - lastUpdate > hardLimit;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return true if the Soft Limit Timer has expired */
|
/** @return true if the Soft Limit Timer has expired */
|
||||||
public boolean expiredSoftLimit() {
|
public boolean expiredSoftLimit() {
|
||||||
return monotonicNow() - lastUpdate > softLimit;
|
return monotonicNow() - lastUpdate > softLimit;
|
||||||
@ -516,6 +504,17 @@ public void setLeasePeriod(long softLimit, long hardLimit) {
|
|||||||
this.hardLimit = hardLimit;
|
this.hardLimit = hardLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized Collection<Lease> getExpiredCandidateLeases() {
|
||||||
|
final long now = Time.monotonicNow();
|
||||||
|
Collection<Lease> expired = new HashSet<>();
|
||||||
|
for (Lease lease : leases.values()) {
|
||||||
|
if (lease.expiredHardLimit(now)) {
|
||||||
|
expired.add(lease);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return expired;
|
||||||
|
}
|
||||||
|
|
||||||
/******************************************************
|
/******************************************************
|
||||||
* Monitor checks for leases that have expired,
|
* Monitor checks for leases that have expired,
|
||||||
* and disposes of them.
|
* and disposes of them.
|
||||||
@ -529,10 +528,19 @@ public void run() {
|
|||||||
for(; shouldRunMonitor && fsnamesystem.isRunning(); ) {
|
for(; shouldRunMonitor && fsnamesystem.isRunning(); ) {
|
||||||
boolean needSync = false;
|
boolean needSync = false;
|
||||||
try {
|
try {
|
||||||
|
// sleep now to avoid infinite loop if an exception was thrown.
|
||||||
|
Thread.sleep(fsnamesystem.getLeaseRecheckIntervalMs());
|
||||||
|
|
||||||
|
// pre-filter the leases w/o the fsn lock.
|
||||||
|
Collection<Lease> candidates = getExpiredCandidateLeases();
|
||||||
|
if (candidates.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
fsnamesystem.writeLockInterruptibly();
|
fsnamesystem.writeLockInterruptibly();
|
||||||
try {
|
try {
|
||||||
if (!fsnamesystem.isInSafeMode()) {
|
if (!fsnamesystem.isInSafeMode()) {
|
||||||
needSync = checkLeases();
|
needSync = checkLeases(candidates);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
fsnamesystem.writeUnlock("leaseManager");
|
fsnamesystem.writeUnlock("leaseManager");
|
||||||
@ -541,8 +549,6 @@ public void run() {
|
|||||||
fsnamesystem.getEditLog().logSync();
|
fsnamesystem.getEditLog().logSync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(fsnamesystem.getLeaseRecheckIntervalMs());
|
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
LOG.debug("{} is interrupted", name, ie);
|
LOG.debug("{} is interrupted", name, ie);
|
||||||
} catch(Throwable e) {
|
} catch(Throwable e) {
|
||||||
@ -557,17 +563,22 @@ public void run() {
|
|||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
synchronized boolean checkLeases() {
|
synchronized boolean checkLeases() {
|
||||||
|
return checkLeases(getExpiredCandidateLeases());
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized boolean checkLeases(Collection<Lease> leasesToCheck) {
|
||||||
boolean needSync = false;
|
boolean needSync = false;
|
||||||
assert fsnamesystem.hasWriteLock();
|
assert fsnamesystem.hasWriteLock();
|
||||||
|
|
||||||
long start = monotonicNow();
|
long start = monotonicNow();
|
||||||
|
for (Lease leaseToCheck : leasesToCheck) {
|
||||||
while(!sortedLeases.isEmpty() &&
|
if (isMaxLockHoldToReleaseLease(start)) {
|
||||||
sortedLeases.first().expiredHardLimit()
|
break;
|
||||||
&& !isMaxLockHoldToReleaseLease(start)) {
|
}
|
||||||
Lease leaseToCheck = sortedLeases.first();
|
if (!leaseToCheck.expiredHardLimit(Time.monotonicNow())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
LOG.info("{} has expired hard limit", leaseToCheck);
|
LOG.info("{} has expired hard limit", leaseToCheck);
|
||||||
|
|
||||||
final List<Long> removing = new ArrayList<>();
|
final List<Long> removing = new ArrayList<>();
|
||||||
// need to create a copy of the oldest lease files, because
|
// need to create a copy of the oldest lease files, because
|
||||||
// internalReleaseLease() removes files corresponding to empty files,
|
// internalReleaseLease() removes files corresponding to empty files,
|
||||||
@ -629,7 +640,6 @@ synchronized boolean checkLeases() {
|
|||||||
removeLease(leaseToCheck, id);
|
removeLease(leaseToCheck, id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return needSync;
|
return needSync;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -644,7 +654,6 @@ private boolean isMaxLockHoldToReleaseLease(long start) {
|
|||||||
public synchronized String toString() {
|
public synchronized String toString() {
|
||||||
return getClass().getSimpleName() + "= {"
|
return getClass().getSimpleName() + "= {"
|
||||||
+ "\n leases=" + leases
|
+ "\n leases=" + leases
|
||||||
+ "\n sortedLeases=" + sortedLeases
|
|
||||||
+ "\n leasesById=" + leasesById
|
+ "\n leasesById=" + leasesById
|
||||||
+ "\n}";
|
+ "\n}";
|
||||||
}
|
}
|
||||||
|
@ -21,13 +21,11 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.AbstractMap;
|
import java.util.AbstractMap;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
@ -54,7 +52,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
@ -67,7 +64,7 @@
|
|||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LEASE_HARDLIMIT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
@ -385,6 +382,10 @@ public void testDeleteAndLeaseRecoveryHardLimitSnapshot() throws Exception {
|
|||||||
// Disable permissions so that another user can recover the lease.
|
// Disable permissions so that another user can recover the lease.
|
||||||
config.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
config.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
|
||||||
config.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
config.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
long leaseRecheck = 1000;
|
||||||
|
conf.setLong(DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY, leaseRecheck);
|
||||||
|
conf.setLong(DFS_LEASE_HARDLIMIT_KEY, leaseRecheck/1000);
|
||||||
|
|
||||||
FSDataOutputStream stm = null;
|
FSDataOutputStream stm = null;
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(config).numDataNodes(3).build();
|
||||||
@ -409,30 +410,8 @@ public void testDeleteAndLeaseRecoveryHardLimitSnapshot() throws Exception {
|
|||||||
// the streamer.
|
// the streamer.
|
||||||
AppendTestUtil.write(stm, 0, BLOCK_SIZE);
|
AppendTestUtil.write(stm, 0, BLOCK_SIZE);
|
||||||
|
|
||||||
// Mock a scenario that the lease reached hard limit.
|
|
||||||
final LeaseManager lm = (LeaseManager) Whitebox
|
|
||||||
.getInternalState(cluster.getNameNode().getNamesystem(),
|
|
||||||
"leaseManager");
|
|
||||||
final TreeSet<Lease> leases =
|
|
||||||
(TreeSet<Lease>) Whitebox.getInternalState(lm, "sortedLeases");
|
|
||||||
final TreeSet<Lease> spyLeases = new TreeSet<>(new Comparator<Lease>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Lease o1, Lease o2) {
|
|
||||||
return Long.signum(o1.getLastUpdate() - o2.getLastUpdate());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
while (!leases.isEmpty()) {
|
|
||||||
final Lease lease = leases.first();
|
|
||||||
final Lease spyLease = Mockito.spy(lease);
|
|
||||||
Mockito.doReturn(true).when(spyLease).expiredHardLimit();
|
|
||||||
spyLeases.add(spyLease);
|
|
||||||
leases.remove(lease);
|
|
||||||
}
|
|
||||||
Whitebox.setInternalState(lm, "sortedLeases", spyLeases);
|
|
||||||
|
|
||||||
// wait for lease manager's background 'Monitor' class to check leases.
|
// wait for lease manager's background 'Monitor' class to check leases.
|
||||||
Thread.sleep(2 * conf.getLong(DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
|
Thread.sleep(2 * leaseRecheck);
|
||||||
DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT));
|
|
||||||
|
|
||||||
LOG.info("Now check we can restart");
|
LOG.info("Now check we can restart");
|
||||||
cluster.restartNameNodes();
|
cluster.restartNameNodes();
|
||||||
|
Loading…
Reference in New Issue
Block a user