HDFS-9236. Missing sanity check for block size during block recovery. (Tony Wu via Yongjun Zhang)
This commit is contained in:
parent
0b18e5e8c6
commit
b64242c0d2
@ -1629,6 +1629,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-9377. Fix findbugs warnings in FSDirSnapshotOp.
|
||||
(Mingliang Liu via Yongjun Zhang)
|
||||
|
||||
HDFS-9236. Missing sanity check for block size during block recovery.
|
||||
(Tony Wu via Yongjun Zhang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -103,8 +103,13 @@ class RecoveryTaskContiguous {
|
||||
protected void recover() throws IOException {
|
||||
List<BlockRecord> syncList = new ArrayList<>(locs.length);
|
||||
int errorCount = 0;
|
||||
int candidateReplicaCnt = 0;
|
||||
|
||||
//check generation stamps
|
||||
// Check generation stamps, replica size and state. Replica must satisfy
|
||||
// the following criteria to be included in syncList for recovery:
|
||||
// - Valid generation stamp
|
||||
// - Non-zero length
|
||||
// - Original state is RWR or better
|
||||
for(DatanodeID id : locs) {
|
||||
try {
|
||||
DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration;
|
||||
@ -115,7 +120,28 @@ protected void recover() throws IOException {
|
||||
if (info != null &&
|
||||
info.getGenerationStamp() >= block.getGenerationStamp() &&
|
||||
info.getNumBytes() > 0) {
|
||||
syncList.add(new BlockRecord(id, proxyDN, info));
|
||||
// Count the number of candidate replicas received.
|
||||
++candidateReplicaCnt;
|
||||
if (info.getOriginalReplicaState().getValue() <=
|
||||
ReplicaState.RWR.getValue()) {
|
||||
syncList.add(new BlockRecord(id, proxyDN, info));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Block recovery: Ignored replica with invalid " +
|
||||
"original state: " + info + " from DataNode: " + id);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (info == null) {
|
||||
LOG.debug("Block recovery: DataNode: " + id + " does not have "
|
||||
+ "replica for block: " + block);
|
||||
} else {
|
||||
LOG.debug("Block recovery: Ignored replica with invalid "
|
||||
+ "generation stamp or length: " + info + " from " +
|
||||
"DataNode: " + id);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (RecoveryInProgressException ripE) {
|
||||
InterDatanodeProtocol.LOG.warn(
|
||||
@ -136,6 +162,15 @@ protected void recover() throws IOException {
|
||||
+ ", datanodeids=" + Arrays.asList(locs));
|
||||
}
|
||||
|
||||
// None of the replicas reported by DataNodes has the required original
|
||||
// state, report the error.
|
||||
if (candidateReplicaCnt > 0 && syncList.isEmpty()) {
|
||||
throw new IOException("Found " + candidateReplicaCnt +
|
||||
" replica(s) for block " + block + " but none is in " +
|
||||
ReplicaState.RWR.name() + " or better state. datanodeids=" +
|
||||
Arrays.asList(locs));
|
||||
}
|
||||
|
||||
syncBlock(syncList);
|
||||
}
|
||||
|
||||
@ -157,6 +192,11 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
|
||||
// or their replicas have 0 length.
|
||||
// The block can be deleted.
|
||||
if (syncList.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("syncBlock for block " + block + ", all datanodes don't " +
|
||||
"have the block or their replicas have 0 length. The block can " +
|
||||
"be deleted.");
|
||||
}
|
||||
nn.commitBlockSynchronization(block, recoveryId, 0,
|
||||
true, true, DatanodeID.EMPTY_ARRAY, null);
|
||||
return;
|
||||
@ -195,6 +235,12 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
|
||||
r.rInfo.getNumBytes() == finalizedLength) {
|
||||
participatingList.add(r);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("syncBlock replicaInfo: block=" + block +
|
||||
", from datanode " + r.id + ", receivedState=" + rState.name() +
|
||||
", receivedLength=" + r.rInfo.getNumBytes() +
|
||||
", bestState=FINALIZED, finalizedLength=" + finalizedLength);
|
||||
}
|
||||
}
|
||||
newBlock.setNumBytes(finalizedLength);
|
||||
break;
|
||||
@ -207,7 +253,16 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
|
||||
minLength = Math.min(minLength, r.rInfo.getNumBytes());
|
||||
participatingList.add(r);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("syncBlock replicaInfo: block=" + block +
|
||||
", from datanode " + r.id + ", receivedState=" + rState.name() +
|
||||
", receivedLength=" + r.rInfo.getNumBytes() + ", bestState=" +
|
||||
bestState.name());
|
||||
}
|
||||
}
|
||||
// recover() guarantees syncList will have at least one replica with RWR
|
||||
// or better state.
|
||||
assert minLength != Long.MAX_VALUE : "wrong minLength";
|
||||
newBlock.setNumBytes(minLength);
|
||||
break;
|
||||
case RUR:
|
||||
@ -254,6 +309,13 @@ void syncBlock(List<BlockRecord> syncList) throws IOException {
|
||||
datanodes[i] = r.id;
|
||||
storages[i] = r.storageID;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Datanode triggering commitBlockSynchronization, block=" +
|
||||
block + ", newGs=" + newBlock.getGenerationStamp() +
|
||||
", newLength=" + newBlock.getNumBytes());
|
||||
}
|
||||
|
||||
nn.commitBlockSynchronization(block,
|
||||
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
|
||||
datanodes, storages);
|
||||
|
@ -49,4 +49,10 @@ public boolean equals(Object o) {
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + "[numBytes=" + this.getNumBytes() +
|
||||
",originalReplicaState=" + this.originalState.name() + "]";
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
@ -680,4 +681,40 @@ public void run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DNs report RUR instead of RBW, RWR or FINALIZED. Primary DN expected to
|
||||
* throw an exception.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRURReplicas() throws Exception {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
|
||||
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(),
|
||||
block.getGenerationStamp(), ReplicaState.RUR)).when(spyDN).
|
||||
initReplicaRecovery(any(RecoveringBlock.class));
|
||||
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
for (RecoveringBlock rBlock : initRecoveringBlocks()) {
|
||||
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
|
||||
recoveryWorker.new RecoveryTaskContiguous(rBlock);
|
||||
BlockRecoveryWorker.RecoveryTaskContiguous spyTask =
|
||||
spy(RecoveryTaskContiguous);
|
||||
spyTask.recover();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// expect IOException to be thrown here
|
||||
e.printStackTrace();
|
||||
assertTrue("Wrong exception was thrown: " + e.getMessage(),
|
||||
e.getMessage().contains("Found 1 replica(s) for block " + block +
|
||||
" but none is in RWR or better state"));
|
||||
exceptionThrown = true;
|
||||
} finally {
|
||||
assertTrue(exceptionThrown);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user