Revert "HDFS-8344. NameNode doesn't recover lease for files with missing blocks (raviprak)"
This reverts commit e4f756260f
.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
parent
68d1f4bfe8
commit
5137b388fc
@ -1056,9 +1056,6 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-8778. TestBlockReportRateLimiting#testLeaseExpiration can deadlock.
|
HDFS-8778. TestBlockReportRateLimiting#testLeaseExpiration can deadlock.
|
||||||
(Arpit Agarwal)
|
(Arpit Agarwal)
|
||||||
|
|
||||||
HDFS-8344. NameNode doesn't recover lease for files with missing blocks
|
|
||||||
(raviprak)
|
|
||||||
|
|
||||||
HDFS-7582. Enforce maximum number of ACL entries separately per access
|
HDFS-7582. Enforce maximum number of ACL entries separately per access
|
||||||
and default. (vinayakumarb)
|
and default. (vinayakumarb)
|
||||||
|
|
||||||
|
@ -440,9 +440,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
|
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
|
||||||
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
|
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
|
||||||
public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 1000;
|
public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 1000;
|
||||||
public static final String DFS_BLOCK_UC_MAX_RECOVERY_ATTEMPTS = "dfs.block.uc.max.recovery.attempts";
|
|
||||||
public static final int DFS_BLOCK_UC_MAX_RECOVERY_ATTEMPTS_DEFAULT = 5;
|
|
||||||
|
|
||||||
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
|
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
|
||||||
public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
|
public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
|
||||||
/* Maximum number of blocks to process for initializing replication queues */
|
/* Maximum number of blocks to process for initializing replication queues */
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -60,11 +61,6 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
|
|||||||
*/
|
*/
|
||||||
protected Block truncateBlock;
|
protected Block truncateBlock;
|
||||||
|
|
||||||
/** The number of times all replicas will be used to attempt recovery before
|
|
||||||
* giving up and marking the block under construction missing.
|
|
||||||
*/
|
|
||||||
private int recoveryAttemptsBeforeMarkingBlockMissing;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ReplicaUnderConstruction contains information about replicas while
|
* ReplicaUnderConstruction contains information about replicas while
|
||||||
* they are under construction.
|
* they are under construction.
|
||||||
@ -178,8 +174,6 @@ public BlockInfoUnderConstruction(Block blk, short replication,
|
|||||||
"BlockInfoUnderConstruction cannot be in COMPLETE state");
|
"BlockInfoUnderConstruction cannot be in COMPLETE state");
|
||||||
this.blockUCState = state;
|
this.blockUCState = state;
|
||||||
setExpectedLocations(targets);
|
setExpectedLocations(targets);
|
||||||
this.recoveryAttemptsBeforeMarkingBlockMissing =
|
|
||||||
BlockManager.getMaxBlockUCRecoveries();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set expected locations. */
|
/** Set expected locations. */
|
||||||
@ -277,7 +271,7 @@ public void initializeBlockRecovery(long recoveryId) {
|
|||||||
if (replicas.size() == 0) {
|
if (replicas.size() == 0) {
|
||||||
NameNode.blockStateChangeLog.warn("BLOCK* " +
|
NameNode.blockStateChangeLog.warn("BLOCK* " +
|
||||||
"BlockInfoUnderConstruction.initLeaseRecovery: " +
|
"BlockInfoUnderConstruction.initLeaseRecovery: " +
|
||||||
"No replicas found.");
|
"No blocks found, lease removed.");
|
||||||
}
|
}
|
||||||
boolean allLiveReplicasTriedAsPrimary = true;
|
boolean allLiveReplicasTriedAsPrimary = true;
|
||||||
for (int i = 0; i < replicas.size(); i++) {
|
for (int i = 0; i < replicas.size(); i++) {
|
||||||
@ -289,11 +283,6 @@ public void initializeBlockRecovery(long recoveryId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (allLiveReplicasTriedAsPrimary) {
|
if (allLiveReplicasTriedAsPrimary) {
|
||||||
recoveryAttemptsBeforeMarkingBlockMissing--;
|
|
||||||
NameNode.blockStateChangeLog.info("Tried to recover " + this +" using all"
|
|
||||||
+ " replicas. Will try " + recoveryAttemptsBeforeMarkingBlockMissing
|
|
||||||
+ " more times");
|
|
||||||
|
|
||||||
// Just set all the replicas to be chosen whether they are alive or not.
|
// Just set all the replicas to be chosen whether they are alive or not.
|
||||||
for (int i = 0; i < replicas.size(); i++) {
|
for (int i = 0; i < replicas.size(); i++) {
|
||||||
replicas.get(i).setChosenAsPrimary(false);
|
replicas.get(i).setChosenAsPrimary(false);
|
||||||
@ -352,10 +341,6 @@ void addReplicaIfNotPresent(DatanodeStorageInfo storage,
|
|||||||
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
|
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getNumRecoveryAttemptsLeft() {
|
|
||||||
return recoveryAttemptsBeforeMarkingBlockMissing;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert an under construction block to a complete block.
|
* Convert an under construction block to a complete block.
|
||||||
*
|
*
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -273,13 +274,6 @@ public int getPendingDataNodeMessageCount() {
|
|||||||
private BlockPlacementPolicy blockplacement;
|
private BlockPlacementPolicy blockplacement;
|
||||||
private final BlockStoragePolicySuite storagePolicySuite;
|
private final BlockStoragePolicySuite storagePolicySuite;
|
||||||
|
|
||||||
/** The number of times a block under construction's recovery will be
|
|
||||||
* attempted using all known replicas. e.g. if there are 3 replicas, each
|
|
||||||
* node will be tried 5 times (for a total of 15 retries across all nodes)*/
|
|
||||||
private static int maxBlockUCRecoveries =
|
|
||||||
DFSConfigKeys.DFS_BLOCK_UC_MAX_RECOVERY_ATTEMPTS_DEFAULT;
|
|
||||||
public static int getMaxBlockUCRecoveries() { return maxBlockUCRecoveries; }
|
|
||||||
|
|
||||||
/** Check whether name system is running before terminating */
|
/** Check whether name system is running before terminating */
|
||||||
private boolean checkNSRunning = true;
|
private boolean checkNSRunning = true;
|
||||||
|
|
||||||
@ -288,9 +282,6 @@ public BlockManager(final Namesystem namesystem, final Configuration conf)
|
|||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||||
maxBlockUCRecoveries = conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_BLOCK_UC_MAX_RECOVERY_ATTEMPTS,
|
|
||||||
DFSConfigKeys.DFS_BLOCK_UC_MAX_RECOVERY_ATTEMPTS_DEFAULT);
|
|
||||||
|
|
||||||
startupDelayBlockDeletionInMs = conf.getLong(
|
startupDelayBlockDeletionInMs = conf.getLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
||||||
@ -740,8 +731,7 @@ private BlockInfo completeBlock(final BlockCollection bc,
|
|||||||
/**
|
/**
|
||||||
* Force the given block in the given file to be marked as complete,
|
* Force the given block in the given file to be marked as complete,
|
||||||
* regardless of whether enough replicas are present. This is necessary
|
* regardless of whether enough replicas are present. This is necessary
|
||||||
* when tailing edit logs as a Standby or when recovering a lease on a file
|
* when tailing edit logs as a Standby.
|
||||||
* with missing blocks.
|
|
||||||
*/
|
*/
|
||||||
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
||||||
final BlockInfoUnderConstruction block) throws IOException {
|
final BlockInfoUnderConstruction block) throws IOException {
|
||||||
|
@ -3287,16 +3287,6 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
+ "Removed empty last block and closed file.");
|
+ "Removed empty last block and closed file.");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
//If the block's recovery has been attempted enough times, mark the block
|
|
||||||
//complete anyway and recover the lease
|
|
||||||
if(uc.getNumRecoveryAttemptsLeft() == 0) {
|
|
||||||
blockManager.forceCompleteBlock(pendingFile, uc);
|
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
|
||||||
iip.getLatestSnapshotId());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// start recovery of the last block for this file
|
// start recovery of the last block for this file
|
||||||
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
|
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
|
||||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||||
|
@ -424,15 +424,6 @@
|
|||||||
<description>The lifetime of access tokens in minutes.</description>
|
<description>The lifetime of access tokens in minutes.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>dfs.block.uc.max.recovery.attempts</name>
|
|
||||||
<value>5</value>
|
|
||||||
<description>The number of times a block under construction's recovery will be
|
|
||||||
attempted using all known replicas. e.g. if there are 3 replicas, each node
|
|
||||||
will be tried 5 times (for a total of 15 retries across all nodes).
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.data.dir</name>
|
<name>dfs.datanode.data.dir</name>
|
||||||
<value>file://${hadoop.tmp.dir}/dfs/data</value>
|
<value>file://${hadoop.tmp.dir}/dfs/data</value>
|
||||||
|
@ -27,7 +27,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -259,81 +258,4 @@ public void testLeaseRecoveryAndAppend() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that when a client was writing to a file and died, and before the
|
|
||||||
* lease can be recovered, all the datanodes to which the file was written
|
|
||||||
* also die, after some time (5 * lease recovery times) the file is indeed
|
|
||||||
* closed and lease recovered.
|
|
||||||
* We also check that if the datanode came back after some time, the data
|
|
||||||
* originally written is not truncated
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testLeaseRecoveryWithMissingBlocks()
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
|
|
||||||
//Start a cluster with 3 datanodes
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
||||||
cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
|
|
||||||
cluster.waitActive();
|
|
||||||
|
|
||||||
//create a file (with replication 1)
|
|
||||||
Path file = new Path("/testRecoveryFile");
|
|
||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
|
||||||
FSDataOutputStream out = dfs.create(file, (short) 1);
|
|
||||||
|
|
||||||
//This keeps count of the number of bytes written (AND is also the data we
|
|
||||||
//are writing)
|
|
||||||
long writtenBytes = 0;
|
|
||||||
while (writtenBytes < 2 * 1024 * 1024) {
|
|
||||||
out.writeLong(writtenBytes);
|
|
||||||
writtenBytes += 8;
|
|
||||||
}
|
|
||||||
System.out.println("Written " + writtenBytes + " bytes");
|
|
||||||
out.hsync();
|
|
||||||
System.out.println("hsynced the data");
|
|
||||||
|
|
||||||
//Kill the datanode to which the file was written.
|
|
||||||
DatanodeInfo dn =
|
|
||||||
((DFSOutputStream) out.getWrappedStream()).getPipeline()[0];
|
|
||||||
DataNodeProperties dnStopped = cluster.stopDataNode(dn.getName());
|
|
||||||
|
|
||||||
//Wait at most 20 seconds for the lease to be recovered
|
|
||||||
LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
|
|
||||||
int i = 40;
|
|
||||||
while(i-- > 0 && lm.countLease() != 0) {
|
|
||||||
System.out.println("Still got " + lm.countLease() + " lease(s)");
|
|
||||||
Thread.sleep(500);
|
|
||||||
}
|
|
||||||
assertTrue("The lease was not recovered", lm.countLease() == 0);
|
|
||||||
System.out.println("Got " + lm.countLease() + " leases");
|
|
||||||
|
|
||||||
//Make sure we can't read any data because the datanode is dead
|
|
||||||
FSDataInputStream in = dfs.open(file);
|
|
||||||
try {
|
|
||||||
in.readLong();
|
|
||||||
assertTrue("Shouldn't have reached here", false);
|
|
||||||
} catch(BlockMissingException bme) {
|
|
||||||
System.out.println("Correctly got BlockMissingException because datanode"
|
|
||||||
+ " is still dead");
|
|
||||||
}
|
|
||||||
|
|
||||||
//Bring the dead datanode back.
|
|
||||||
cluster.restartDataNode(dnStopped);
|
|
||||||
System.out.println("Restart datanode");
|
|
||||||
|
|
||||||
//Make sure we can read all the data back (since we hsync'ed).
|
|
||||||
in = dfs.open(file);
|
|
||||||
int readBytes = 0;
|
|
||||||
while(in.available() != 0) {
|
|
||||||
assertEquals("Didn't read the data we wrote", in.readLong(), readBytes);
|
|
||||||
readBytes += 8;
|
|
||||||
}
|
|
||||||
assertEquals("Didn't get all the data", readBytes, writtenBytes);
|
|
||||||
System.out.println("Read back all the " + readBytes + " bytes");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user