HDFS-15725. Lease Recovery never completes for a committed block which the DNs never finalize. Contributed by Stephen O'Donnell
This commit is contained in:
parent
6de1a8eb67
commit
9ed737001c
@ -3711,17 +3711,6 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
" internalReleaseLease: Committed blocks are minimally" +
|
" internalReleaseLease: Committed blocks are minimally" +
|
||||||
" replicated, lease removed, file" + src + " closed.");
|
" replicated, lease removed, file" + src + " closed.");
|
||||||
return true; // closed!
|
return true; // closed!
|
||||||
} else if (penultimateBlockMinStorage && lastBlock.getNumBytes() == 0) {
|
|
||||||
// HDFS-14498 - this is a file with a final block of zero bytes and was
|
|
||||||
// likely left in this state by a client which exited unexpectedly
|
|
||||||
pendingFile.removeLastBlock(lastBlock);
|
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile,
|
|
||||||
iip.getLatestSnapshotId(), false);
|
|
||||||
NameNode.stateChangeLog.warn("BLOCK*" +
|
|
||||||
" internalReleaseLease: Committed last block is zero bytes with" +
|
|
||||||
" insufficient replicas. Final block removed, lease removed, file "
|
|
||||||
+ src + " closed.");
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
// Cannot close file right now, since some blocks
|
// Cannot close file right now, since some blocks
|
||||||
// are not yet minimally replicated.
|
// are not yet minimally replicated.
|
||||||
@ -3729,10 +3718,13 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
|||||||
// if there are no valid replicas on data-nodes.
|
// if there are no valid replicas on data-nodes.
|
||||||
String message = "DIR* NameSystem.internalReleaseLease: " +
|
String message = "DIR* NameSystem.internalReleaseLease: " +
|
||||||
"Failed to release lease for file " + src +
|
"Failed to release lease for file " + src +
|
||||||
". Committed blocks are waiting to be minimally replicated." +
|
". Committed blocks are waiting to be minimally replicated.";
|
||||||
" Try again later.";
|
|
||||||
NameNode.stateChangeLog.warn(message);
|
NameNode.stateChangeLog.warn(message);
|
||||||
throw new AlreadyBeingCreatedException(message);
|
if (!penultimateBlockMinStorage) {
|
||||||
|
throw new AlreadyBeingCreatedException(message);
|
||||||
|
}
|
||||||
|
// Intentionally fall through to UNDER_RECOVERY so BLOCK_RECOVERY is
|
||||||
|
// attempted
|
||||||
case UNDER_CONSTRUCTION:
|
case UNDER_CONSTRUCTION:
|
||||||
case UNDER_RECOVERY:
|
case UNDER_RECOVERY:
|
||||||
BlockUnderConstructionFeature uc =
|
BlockUnderConstructionFeature uc =
|
||||||
|
@ -17,36 +17,46 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -351,7 +361,13 @@ public void testLeaseRecoveryEmptyCommittedLastBlock() throws Exception {
|
|||||||
String file = "/test/f1";
|
String file = "/test/f1";
|
||||||
Path filePath = new Path(file);
|
Path filePath = new Path(file);
|
||||||
|
|
||||||
createCommittedNotCompleteFile(client, file);
|
createCommittedNotCompleteFile(client, file, null, 1);
|
||||||
|
|
||||||
|
INodeFile inode = cluster.getNamesystem().getFSDirectory()
|
||||||
|
.getINode(filePath.toString()).asFile();
|
||||||
|
assertTrue(inode.isUnderConstruction());
|
||||||
|
assertEquals(1, inode.numBlocks());
|
||||||
|
assertNotNull(inode.getLastBlock());
|
||||||
|
|
||||||
// Ensure a different client cannot append the file
|
// Ensure a different client cannot append the file
|
||||||
try {
|
try {
|
||||||
@ -361,9 +377,18 @@ public void testLeaseRecoveryEmptyCommittedLastBlock() throws Exception {
|
|||||||
assertTrue(e.getMessage().contains("file lease is currently owned"));
|
assertTrue(e.getMessage().contains("file lease is currently owned"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the lease can be recovered on the first try
|
// Lease will not be recovered on the first try
|
||||||
boolean recovered = client.recoverLease(file);
|
assertEquals(false, client.recoverLease(file));
|
||||||
assertEquals(true, recovered);
|
for (int i=0; i < 10 && !client.recoverLease(file); i++) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
assertTrue(client.recoverLease(file));
|
||||||
|
|
||||||
|
inode = cluster.getNamesystem().getFSDirectory()
|
||||||
|
.getINode(filePath.toString()).asFile();
|
||||||
|
assertTrue(!inode.isUnderConstruction());
|
||||||
|
assertEquals(0, inode.numBlocks());
|
||||||
|
assertNull(inode.getLastBlock());
|
||||||
|
|
||||||
// Ensure the recovered file can now be written
|
// Ensure the recovered file can now be written
|
||||||
FSDataOutputStream append = dfs.append(filePath);
|
FSDataOutputStream append = dfs.append(filePath);
|
||||||
@ -395,7 +420,7 @@ public void testLeaseManagerRecoversEmptyCommittedLastBlock()
|
|||||||
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||||
String file = "/test/f1";
|
String file = "/test/f1";
|
||||||
|
|
||||||
createCommittedNotCompleteFile(client, file);
|
createCommittedNotCompleteFile(client, file, null, 1);
|
||||||
waitLeaseRecovery(cluster);
|
waitLeaseRecovery(cluster);
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@ -415,23 +440,167 @@ public void testLeaseManagerRecoversEmptyCommittedLastBlock()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createCommittedNotCompleteFile(DFSClient client, String file)
|
@Test
|
||||||
throws IOException {
|
public void testAbortedRecovery() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
DFSClient client = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
client =
|
||||||
|
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||||
|
final String file = "/test/f1";
|
||||||
|
|
||||||
|
HdfsFileStatus stat = client.getNamenode()
|
||||||
|
.create(file, new FsPermission("777"), client.clientName,
|
||||||
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||||
|
true, (short) 1, 1024 * 1024 * 128L,
|
||||||
|
new CryptoProtocolVersion[0], null, null);
|
||||||
|
|
||||||
|
assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
||||||
|
cluster.getNameNode(), file));
|
||||||
|
|
||||||
|
// Add a block to the file
|
||||||
|
ExtendedBlock block = client.getNamenode().addBlock(
|
||||||
|
file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(),
|
||||||
|
new String[0], null).getBlock();
|
||||||
|
|
||||||
|
// update the pipeline to get a new genstamp.
|
||||||
|
ExtendedBlock updatedBlock = client.getNamenode()
|
||||||
|
.updateBlockForPipeline(block, client.clientName)
|
||||||
|
.getBlock();
|
||||||
|
// fake that some data was maybe written. commit block sync will
|
||||||
|
// reconcile.
|
||||||
|
updatedBlock.setNumBytes(1234);
|
||||||
|
|
||||||
|
// get the stored block and make it look like the DN sent a RBW IBR.
|
||||||
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
BlockInfo storedBlock = bm.getStoredBlock(block.getLocalBlock());
|
||||||
|
BlockUnderConstructionFeature uc =
|
||||||
|
storedBlock.getUnderConstructionFeature();
|
||||||
|
uc.setExpectedLocations(updatedBlock.getLocalBlock(),
|
||||||
|
uc.getExpectedStorageLocations(), BlockType.CONTIGUOUS);
|
||||||
|
|
||||||
|
// complete the file w/o updatePipeline to simulate client failure.
|
||||||
|
client.getNamenode().complete(file, client.clientName, block,
|
||||||
|
stat.getFileId());
|
||||||
|
|
||||||
|
assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
||||||
|
cluster.getNameNode(), file));
|
||||||
|
|
||||||
|
cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
String holder = NameNodeAdapter
|
||||||
|
.getLeaseHolderForPath(cluster.getNameNode(), file);
|
||||||
|
return holder == null;
|
||||||
|
}
|
||||||
|
}, 100, 20000);
|
||||||
|
// nothing was actually written so the block should be dropped.
|
||||||
|
assertTrue(storedBlock.isDeleted());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
if (client != null) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaseManagerRecoversCommittedLastBlockWithContent()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
DFSClient client = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
client =
|
||||||
|
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
||||||
|
String file = "/test/f2";
|
||||||
|
|
||||||
|
byte[] bytesToWrite = new byte[1];
|
||||||
|
bytesToWrite[0] = 123;
|
||||||
|
createCommittedNotCompleteFile(client, file, bytesToWrite, 3);
|
||||||
|
|
||||||
|
waitLeaseRecovery(cluster);
|
||||||
|
|
||||||
|
DistributedFileSystem hdfs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Now the least has been recovered, attempt to append the file and then
|
||||||
|
// ensure the earlier written and newly written data can be read back.
|
||||||
|
FSDataOutputStream op = null;
|
||||||
|
try {
|
||||||
|
op = hdfs.append(new Path(file));
|
||||||
|
op.write(23);
|
||||||
|
} finally {
|
||||||
|
if (op != null) {
|
||||||
|
op.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FSDataInputStream stream = null;
|
||||||
|
try {
|
||||||
|
stream = cluster.getFileSystem().open(new Path(file));
|
||||||
|
assertEquals(123, stream.readByte());
|
||||||
|
assertEquals(23, stream.readByte());
|
||||||
|
} finally {
|
||||||
|
stream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally check there are no leases for the file and hence the file is
|
||||||
|
// closed.
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
String holder = NameNodeAdapter
|
||||||
|
.getLeaseHolderForPath(cluster.getNameNode(), file);
|
||||||
|
return holder == null;
|
||||||
|
}, 100, 10000);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
if (client != null) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createCommittedNotCompleteFile(DFSClient client, String file,
|
||||||
|
byte[] bytesToWrite, int repFactor) throws IOException {
|
||||||
HdfsFileStatus stat = client.getNamenode()
|
HdfsFileStatus stat = client.getNamenode()
|
||||||
.create(file, new FsPermission("777"), "test client",
|
.create(file, new FsPermission("777"), client.clientName,
|
||||||
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
||||||
true, (short) 1, 1024 * 1024 * 128L,
|
true, (short) repFactor, 1024 * 1024 * 128L,
|
||||||
new CryptoProtocolVersion[0], null, null);
|
new CryptoProtocolVersion[0], null, null);
|
||||||
// Add a block to the file
|
// Add a block to the file
|
||||||
LocatedBlock blk = client.getNamenode()
|
LocatedBlock blk = client.getNamenode()
|
||||||
.addBlock(file, "test client", null,
|
.addBlock(file, client.clientName, null,
|
||||||
new DatanodeInfo[0], stat.getFileId(), new String[0], null);
|
new DatanodeInfo[0], stat.getFileId(), new String[0], null);
|
||||||
// Without writing anything to the file, or setting up the DN pipeline
|
ExtendedBlock finalBlock = blk.getBlock();
|
||||||
// attempt to close the file. This will fail (return false) as the NN will
|
if (bytesToWrite != null) {
|
||||||
|
// Here we create a output stream and then abort it so the block gets
|
||||||
|
// created on the datanode, but we never send the message to tell the DN
|
||||||
|
// to complete the block. This simulates the client crashing after it
|
||||||
|
// wrote the data, but before the file gets closed.
|
||||||
|
DFSOutputStream s = new DFSOutputStream(client, file, stat,
|
||||||
|
EnumSet.of(CreateFlag.CREATE), null,
|
||||||
|
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512),
|
||||||
|
null, true);
|
||||||
|
s.start();
|
||||||
|
s.write(bytesToWrite);
|
||||||
|
s.hflush();
|
||||||
|
finalBlock = s.getBlock();
|
||||||
|
s.abort();
|
||||||
|
}
|
||||||
|
// Attempt to close the file. This will fail (return false) as the NN will
|
||||||
// be expecting the registered block to be reported from the DNs via IBR,
|
// be expecting the registered block to be reported from the DNs via IBR,
|
||||||
// but that will never happen, as the pipeline was never established
|
// but that will never happen, as we either did not write it, or we aborted
|
||||||
|
// the stream preventing the "close block" message to be sent to the DN.
|
||||||
boolean closed = client.getNamenode().complete(
|
boolean closed = client.getNamenode().complete(
|
||||||
file, "test client", blk.getBlock(), stat.getFileId());
|
file, client.clientName, finalBlock, stat.getFileId());
|
||||||
assertEquals(false, closed);
|
assertEquals(false, closed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user