HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang.
This commit is contained in:
parent
b3269f7cc1
commit
8c2c812832
@ -1424,13 +1424,27 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
|
|||||||
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long bytesOnDisk = rbw.getBytesOnDisk();
|
||||||
|
long blockDataLength = rbw.getReplicaInfo().getBlockDataLength();
|
||||||
|
if (bytesOnDisk != blockDataLength) {
|
||||||
|
LOG.info("Resetting bytesOnDisk to match blockDataLength (={}) for " +
|
||||||
|
"replica {}", blockDataLength, rbw);
|
||||||
|
bytesOnDisk = blockDataLength;
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bytesOnDisk < bytesAcked) {
|
||||||
|
throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
|
||||||
|
"bytesAcked for replica " + rbw);
|
||||||
|
}
|
||||||
|
|
||||||
FsVolumeReference ref = rbw.getReplicaInfo()
|
FsVolumeReference ref = rbw.getReplicaInfo()
|
||||||
.getVolume().obtainReference();
|
.getVolume().obtainReference();
|
||||||
try {
|
try {
|
||||||
// Truncate the potentially corrupt portion.
|
// Truncate the potentially corrupt portion.
|
||||||
// If the source was client and the last node in the pipeline was lost,
|
// If the source was client and the last node in the pipeline was lost,
|
||||||
// any corrupt data written after the acked length can go unnoticed.
|
// any corrupt data written after the acked length can go unnoticed.
|
||||||
if (numBytes > bytesAcked) {
|
if (bytesOnDisk > bytesAcked) {
|
||||||
rbw.getReplicaInfo().truncateBlock(bytesAcked);
|
rbw.getReplicaInfo().truncateBlock(bytesAcked);
|
||||||
rbw.setNumBytes(bytesAcked);
|
rbw.setNumBytes(bytesAcked);
|
||||||
rbw.setLastChecksumAndDataLen(bytesAcked, null);
|
rbw.setLastChecksumAndDataLen(bytesAcked, null);
|
||||||
@ -2460,8 +2474,8 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
|
|||||||
|
|
||||||
//check replica bytes on disk.
|
//check replica bytes on disk.
|
||||||
if (replica.getBytesOnDisk() < replica.getVisibleLength()) {
|
if (replica.getBytesOnDisk() < replica.getVisibleLength()) {
|
||||||
throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
|
throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
|
||||||
+ " getBytesOnDisk() < getVisibleLength(), rip=" + replica);
|
+ replica);
|
||||||
}
|
}
|
||||||
|
|
||||||
//check the replica's files
|
//check the replica's files
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
@ -302,6 +303,15 @@ public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
|
|||||||
rbw.getBlockFile().createNewFile();
|
rbw.getBlockFile().createNewFile();
|
||||||
rbw.getMetaFile().createNewFile();
|
rbw.getMetaFile().createNewFile();
|
||||||
dataset.volumeMap.add(bpid, rbw);
|
dataset.volumeMap.add(bpid, rbw);
|
||||||
|
|
||||||
|
FileIoProvider fileIoProvider = rbw.getFileIoProvider();
|
||||||
|
|
||||||
|
try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
|
||||||
|
volume, rbw.getBlockFile(), "rw")) {
|
||||||
|
//extend blockFile
|
||||||
|
blockRAF.setLength(eb.getNumBytes());
|
||||||
|
}
|
||||||
|
saveMetaFileHeader(rbw.getMetaFile());
|
||||||
return rbw;
|
return rbw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,8 +19,10 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -36,12 +38,14 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -154,7 +158,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
|
|||||||
|
|
||||||
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
||||||
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
|
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
|
||||||
new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
|
new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
|
||||||
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -552,7 +556,52 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can successfully recover a {@link ReplicaBeingWritten}
|
||||||
|
* which has inconsistent metadata (bytes were written to disk but bytesOnDisk
|
||||||
|
* was not updated) but that recovery fails when the block is actually
|
||||||
|
* corrupt (bytes are not present on disk).
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRecoverInconsistentRbw() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||||
|
|
||||||
|
// set up replicasMap
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
|
||||||
|
|
||||||
|
ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
|
||||||
|
getReplicaInfo(bpid, blocks[RBW].getBlockId());
|
||||||
|
long bytesOnDisk = rbw.getBytesOnDisk();
|
||||||
|
// simulate an inconsistent replica length update by reducing in-memory
|
||||||
|
// value of on disk length
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
|
||||||
|
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
|
||||||
|
rbw.getNumBytes());
|
||||||
|
// after the recovery, on disk length should equal acknowledged length.
|
||||||
|
Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
|
||||||
|
|
||||||
|
// reduce on disk length again; this time actually truncate the file to
|
||||||
|
// simulate the data not being present
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
|
||||||
|
try (RandomAccessFile blockRAF = rbw.getFileIoProvider().
|
||||||
|
getRandomAccessFile(rbw.getVolume(), rbw.getBlockFile(), "rw")) {
|
||||||
|
// truncate blockFile
|
||||||
|
blockRAF.setLength(bytesOnDisk - 1);
|
||||||
|
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
|
||||||
|
rbw.getNumBytes());
|
||||||
|
fail("recovery should have failed");
|
||||||
|
} catch (ReplicaNotFoundException rnfe) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
|
||||||
|
"bytesAcked for replica", rnfe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compare the replica map before and after the restart
|
* Compare the replica map before and after the restart
|
||||||
**/
|
**/
|
||||||
|
Loading…
Reference in New Issue
Block a user