HDFS-3875. Issue handling checksum errors in write pipeline. Contributed by Kihwal Lee.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1484808 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e5ed91aff7
commit
98a692fd63
@ -3064,6 +3064,8 @@ Release 0.23.8 - UNRELEASED
|
|||||||
HDFS-4805. Webhdfs client is fragile to token renewal errors
|
HDFS-4805. Webhdfs client is fragile to token renewal errors
|
||||||
(daryn via kihwal)
|
(daryn via kihwal)
|
||||||
|
|
||||||
|
HDFS-3875. Issue handling checksum errors in write pipeline. (kihwal)
|
||||||
|
|
||||||
Release 0.23.7 - UNRELEASED
|
Release 0.23.7 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
||||||
|
* Calls into this are a no-op in production code.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class DFSClientFaultInjector {
|
||||||
|
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
|
||||||
|
|
||||||
|
public static DFSClientFaultInjector get() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean corruptPacket() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean uncorruptPacket() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
@ -254,8 +254,18 @@ void writeTo(DataOutputStream stm) throws IOException {
|
|||||||
System.arraycopy(header.getBytes(), 0, buf, headerStart,
|
System.arraycopy(header.getBytes(), 0, buf, headerStart,
|
||||||
header.getSerializedSize());
|
header.getSerializedSize());
|
||||||
|
|
||||||
|
// corrupt the data for testing.
|
||||||
|
if (DFSClientFaultInjector.get().corruptPacket()) {
|
||||||
|
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||||
|
}
|
||||||
|
|
||||||
// Write the now contiguous full packet to the output stream.
|
// Write the now contiguous full packet to the output stream.
|
||||||
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
|
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
|
||||||
|
|
||||||
|
// undo corruption.
|
||||||
|
if (DFSClientFaultInjector.get().uncorruptPacket()) {
|
||||||
|
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the packet's last byte's offset in the block
|
// get the packet's last byte's offset in the block
|
||||||
@ -323,6 +333,9 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
|
|||||||
|
|
||||||
/** Nodes have been used in the pipeline before and have failed. */
|
/** Nodes have been used in the pipeline before and have failed. */
|
||||||
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
|
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
|
||||||
|
/** The last ack sequence number before pipeline failure. */
|
||||||
|
private long lastAckedSeqnoBeforeFailure = -1;
|
||||||
|
private int pipelineRecoveryCount = 0;
|
||||||
/** Has the current block been hflushed? */
|
/** Has the current block been hflushed? */
|
||||||
private boolean isHflushed = false;
|
private boolean isHflushed = false;
|
||||||
/** Append on an existing block? */
|
/** Append on an existing block? */
|
||||||
@ -779,6 +792,23 @@ private boolean processDatanodeError() throws IOException {
|
|||||||
ackQueue.clear();
|
ackQueue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record the new pipeline failure recovery.
|
||||||
|
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
|
||||||
|
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
|
||||||
|
pipelineRecoveryCount = 1;
|
||||||
|
} else {
|
||||||
|
// If we had to recover the pipeline five times in a row for the
|
||||||
|
// same packet, this client likely has corrupt data or corrupting
|
||||||
|
// during transmission.
|
||||||
|
if (++pipelineRecoveryCount > 5) {
|
||||||
|
DFSClient.LOG.warn("Error recovering pipeline for writing " +
|
||||||
|
block + ". Already retried 5 times for the same packet.");
|
||||||
|
lastException = new IOException("Failing write. Tried pipeline " +
|
||||||
|
"recovery 5 times without success.");
|
||||||
|
streamerClosed = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
boolean doSleep = setupPipelineForAppendOrRecovery();
|
boolean doSleep = setupPipelineForAppendOrRecovery();
|
||||||
|
|
||||||
if (!streamerClosed && dfsClient.clientRunning) {
|
if (!streamerClosed && dfsClient.clientRunning) {
|
||||||
|
@ -377,7 +377,8 @@ private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
|
|||||||
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
|
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
|
||||||
} catch (ChecksumException ce) {
|
} catch (ChecksumException ce) {
|
||||||
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
|
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
|
||||||
if (srcDataNode != null) {
|
// No need to report to namenode when client is writing.
|
||||||
|
if (srcDataNode != null && isDatanode) {
|
||||||
try {
|
try {
|
||||||
LOG.info("report corrupt " + block + " from datanode " +
|
LOG.info("report corrupt " + block + " from datanode " +
|
||||||
srcDataNode + " to namenode");
|
srcDataNode + " to namenode");
|
||||||
@ -404,6 +405,19 @@ private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
|
|||||||
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
|
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether checksum needs to be verified.
|
||||||
|
* Skip verifying checksum iff this is not the last one in the
|
||||||
|
* pipeline and clientName is non-null. i.e. Checksum is verified
|
||||||
|
* on all the datanodes when the data is being written by a
|
||||||
|
* datanode rather than a client. Whe client is writing the data,
|
||||||
|
* protocol includes acks and only the last datanode needs to verify
|
||||||
|
* checksum.
|
||||||
|
* @return true if checksum verification is needed, otherwise false.
|
||||||
|
*/
|
||||||
|
private boolean shouldVerifyChecksum() {
|
||||||
|
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Receives and processes a packet. It can contain many chunks.
|
* Receives and processes a packet. It can contain many chunks.
|
||||||
@ -451,9 +465,9 @@ private int receivePacket() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// put in queue for pending acks, unless sync was requested
|
// put in queue for pending acks, unless sync was requested
|
||||||
if (responder != null && !syncBlock) {
|
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
|
||||||
((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
||||||
lastPacketInBlock, offsetInBlock);
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
//First write the packet to the mirror:
|
//First write the packet to the mirror:
|
||||||
@ -487,15 +501,24 @@ private int receivePacket() throws IOException {
|
|||||||
"length " + checksumLen);
|
"length " + checksumLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* skip verifying checksum iff this is not the last one in the
|
if (shouldVerifyChecksum()) {
|
||||||
* pipeline and clientName is non-null. i.e. Checksum is verified
|
try {
|
||||||
* on all the datanodes when the data is being written by a
|
|
||||||
* datanode rather than a client. Whe client is writing the data,
|
|
||||||
* protocol includes acks and only the last datanode needs to verify
|
|
||||||
* checksum.
|
|
||||||
*/
|
|
||||||
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
|
|
||||||
verifyChunks(dataBuf, checksumBuf);
|
verifyChunks(dataBuf, checksumBuf);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// checksum error detected locally. there is no reason to continue.
|
||||||
|
if (responder != null) {
|
||||||
|
try {
|
||||||
|
((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
||||||
|
lastPacketInBlock, offsetInBlock,
|
||||||
|
Status.ERROR_CHECKSUM);
|
||||||
|
// Wait until the responder sends back the response
|
||||||
|
// and interrupt this thread.
|
||||||
|
Thread.sleep(3000);
|
||||||
|
} catch (InterruptedException e) { }
|
||||||
|
}
|
||||||
|
throw new IOException("Terminating due to a checksum error." + ioe);
|
||||||
|
}
|
||||||
|
|
||||||
if (needsChecksumTranslation) {
|
if (needsChecksumTranslation) {
|
||||||
// overwrite the checksums in the packet buffer with the
|
// overwrite the checksums in the packet buffer with the
|
||||||
// appropriate polynomial for the disk storage.
|
// appropriate polynomial for the disk storage.
|
||||||
@ -584,9 +607,9 @@ private int receivePacket() throws IOException {
|
|||||||
|
|
||||||
// if sync was requested, put in queue for pending acks here
|
// if sync was requested, put in queue for pending acks here
|
||||||
// (after the fsync finished)
|
// (after the fsync finished)
|
||||||
if (responder != null && syncBlock) {
|
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
|
||||||
((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
||||||
lastPacketInBlock, offsetInBlock);
|
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (throttler != null) { // throttle I/O
|
if (throttler != null) { // throttle I/O
|
||||||
@ -783,7 +806,7 @@ private static enum PacketResponderType {
|
|||||||
private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
|
private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processed responses from downstream datanodes in the pipeline
|
* Processes responses from downstream datanodes in the pipeline
|
||||||
* and sends back replies to the originator.
|
* and sends back replies to the originator.
|
||||||
*/
|
*/
|
||||||
class PacketResponder implements Runnable, Closeable {
|
class PacketResponder implements Runnable, Closeable {
|
||||||
@ -836,9 +859,9 @@ private boolean isRunning() {
|
|||||||
* @param offsetInBlock
|
* @param offsetInBlock
|
||||||
*/
|
*/
|
||||||
void enqueue(final long seqno, final boolean lastPacketInBlock,
|
void enqueue(final long seqno, final boolean lastPacketInBlock,
|
||||||
final long offsetInBlock) {
|
final long offsetInBlock, final Status ackStatus) {
|
||||||
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
|
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
|
||||||
System.nanoTime());
|
System.nanoTime(), ackStatus);
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug(myString + ": enqueue " + p);
|
LOG.debug(myString + ": enqueue " + p);
|
||||||
}
|
}
|
||||||
@ -976,7 +999,8 @@ public void run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
sendAckUpstream(ack, expected, totalAckTimeNanos,
|
sendAckUpstream(ack, expected, totalAckTimeNanos,
|
||||||
(pkt != null ? pkt.offsetInBlock : 0));
|
(pkt != null ? pkt.offsetInBlock : 0),
|
||||||
|
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
|
||||||
if (pkt != null) {
|
if (pkt != null) {
|
||||||
// remove the packet from the ack queue
|
// remove the packet from the ack queue
|
||||||
removeAckHead();
|
removeAckHead();
|
||||||
@ -1038,7 +1062,8 @@ private void finalizeBlock(long startTime) throws IOException {
|
|||||||
* @param offsetInBlock offset in block for the data in packet
|
* @param offsetInBlock offset in block for the data in packet
|
||||||
*/
|
*/
|
||||||
private void sendAckUpstream(PipelineAck ack, long seqno,
|
private void sendAckUpstream(PipelineAck ack, long seqno,
|
||||||
long totalAckTimeNanos, long offsetInBlock) throws IOException {
|
long totalAckTimeNanos, long offsetInBlock,
|
||||||
|
Status myStatus) throws IOException {
|
||||||
Status[] replies = null;
|
Status[] replies = null;
|
||||||
if (mirrorError) { // ack read error
|
if (mirrorError) { // ack read error
|
||||||
replies = MIRROR_ERROR_STATUS;
|
replies = MIRROR_ERROR_STATUS;
|
||||||
@ -1046,10 +1071,19 @@ private void sendAckUpstream(PipelineAck ack, long seqno,
|
|||||||
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
|
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
|
||||||
.getNumOfReplies();
|
.getNumOfReplies();
|
||||||
replies = new Status[1 + ackLen];
|
replies = new Status[1 + ackLen];
|
||||||
replies[0] = Status.SUCCESS;
|
replies[0] = myStatus;
|
||||||
for (int i = 0; i < ackLen; i++) {
|
for (int i = 0; i < ackLen; i++) {
|
||||||
replies[i + 1] = ack.getReply(i);
|
replies[i + 1] = ack.getReply(i);
|
||||||
}
|
}
|
||||||
|
// If the mirror has reported that it received a corrupt packet,
|
||||||
|
// do self-destruct to mark myself bad, instead of making the
|
||||||
|
// mirror node bad. The mirror is guaranteed to be good without
|
||||||
|
// corrupt data on disk.
|
||||||
|
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
|
||||||
|
throw new IOException("Shutting down writer and responder "
|
||||||
|
+ "since the down streams reported the data sent by this "
|
||||||
|
+ "thread is corrupt");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PipelineAck replyAck = new PipelineAck(seqno, replies,
|
PipelineAck replyAck = new PipelineAck(seqno, replies,
|
||||||
totalAckTimeNanos);
|
totalAckTimeNanos);
|
||||||
@ -1064,6 +1098,14 @@ private void sendAckUpstream(PipelineAck ack, long seqno,
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(myString + ", replyAck=" + replyAck);
|
LOG.debug(myString + ", replyAck=" + replyAck);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a corruption was detected in the received data, terminate after
|
||||||
|
// sending ERROR_CHECKSUM back.
|
||||||
|
if (myStatus == Status.ERROR_CHECKSUM) {
|
||||||
|
throw new IOException("Shutting down writer and responder "
|
||||||
|
+ "due to a checksum error in received data. The error "
|
||||||
|
+ "response has been sent upstream.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1085,13 +1127,15 @@ private static class Packet {
|
|||||||
final boolean lastPacketInBlock;
|
final boolean lastPacketInBlock;
|
||||||
final long offsetInBlock;
|
final long offsetInBlock;
|
||||||
final long ackEnqueueNanoTime;
|
final long ackEnqueueNanoTime;
|
||||||
|
final Status ackStatus;
|
||||||
|
|
||||||
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
|
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
|
||||||
long ackEnqueueNanoTime) {
|
long ackEnqueueNanoTime, Status ackStatus) {
|
||||||
this.seqno = seqno;
|
this.seqno = seqno;
|
||||||
this.lastPacketInBlock = lastPacketInBlock;
|
this.lastPacketInBlock = lastPacketInBlock;
|
||||||
this.offsetInBlock = offsetInBlock;
|
this.offsetInBlock = offsetInBlock;
|
||||||
this.ackEnqueueNanoTime = ackEnqueueNanoTime;
|
this.ackEnqueueNanoTime = ackEnqueueNanoTime;
|
||||||
|
this.ackStatus = ackStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1100,6 +1144,7 @@ public String toString() {
|
|||||||
+ ", lastPacketInBlock=" + lastPacketInBlock
|
+ ", lastPacketInBlock=" + lastPacketInBlock
|
||||||
+ ", offsetInBlock=" + offsetInBlock
|
+ ", offsetInBlock=" + offsetInBlock
|
||||||
+ ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
|
+ ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
|
||||||
|
+ ", ackStatus=" + ackStatus
|
||||||
+ ")";
|
+ ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -749,13 +749,25 @@ public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check replica length
|
// check replica length
|
||||||
if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
|
long bytesAcked = rbw.getBytesAcked();
|
||||||
|
long numBytes = rbw.getNumBytes();
|
||||||
|
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
|
||||||
throw new ReplicaNotFoundException("Unmatched length replica " +
|
throw new ReplicaNotFoundException("Unmatched length replica " +
|
||||||
replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
|
replicaInfo + ": BytesAcked = " + bytesAcked +
|
||||||
" BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
|
" BytesRcvd = " + numBytes + " are not in the range of [" +
|
||||||
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
minBytesRcvd + ", " + maxBytesRcvd + "].");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Truncate the potentially corrupt portion.
|
||||||
|
// If the source was client and the last node in the pipeline was lost,
|
||||||
|
// any corrupt data written after the acked length can go unnoticed.
|
||||||
|
if (numBytes > bytesAcked) {
|
||||||
|
final File replicafile = rbw.getBlockFile();
|
||||||
|
truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
|
||||||
|
rbw.setNumBytes(bytesAcked);
|
||||||
|
rbw.setLastChecksumAndDataLen(bytesAcked, null);
|
||||||
|
}
|
||||||
|
|
||||||
// bump the replica's generation stamp to newGS
|
// bump the replica's generation stamp to newGS
|
||||||
bumpReplicaGS(rbw, newGS);
|
bumpReplicaGS(rbw, newGS);
|
||||||
|
|
||||||
|
@ -31,11 +31,18 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A JUnit test for corrupted file handling.
|
* A JUnit test for corrupted file handling.
|
||||||
* This test creates a bunch of files/directories with replication
|
* This test creates a bunch of files/directories with replication
|
||||||
@ -64,6 +71,79 @@
|
|||||||
* replica was created from the non-corrupted replica.
|
* replica was created from the non-corrupted replica.
|
||||||
*/
|
*/
|
||||||
public class TestCrcCorruption {
|
public class TestCrcCorruption {
|
||||||
|
|
||||||
|
private DFSClientFaultInjector faultInjector;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
faultInjector = Mockito.mock(DFSClientFaultInjector.class);
|
||||||
|
DFSClientFaultInjector.instance = faultInjector;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case for data corruption during data transmission for
|
||||||
|
* create/write. To recover from corruption while writing, at
|
||||||
|
* least two replicas are needed.
|
||||||
|
*/
|
||||||
|
@Test(timeout=50000)
|
||||||
|
public void testCorruptionDuringWrt() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
Path file = new Path("/test_corruption_file");
|
||||||
|
FSDataOutputStream out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
|
||||||
|
byte[] data = new byte[65536];
|
||||||
|
for (int i=0; i < 65536; i++) {
|
||||||
|
data[i] = (byte)(i % 256);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
out.write(data, 0, 65535);
|
||||||
|
}
|
||||||
|
out.hflush();
|
||||||
|
// corrupt the packet once
|
||||||
|
Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
|
||||||
|
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(true, false);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
out.write(data, 0, 65535);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
// read should succeed
|
||||||
|
FSDataInputStream in = fs.open(file);
|
||||||
|
for(int c; (c = in.read()) != -1; );
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
// test the retry limit
|
||||||
|
out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
|
||||||
|
|
||||||
|
// corrupt the packet once and never fix it.
|
||||||
|
Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
|
||||||
|
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
|
||||||
|
|
||||||
|
// the client should give up pipeline reconstruction after retries.
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
out.write(data, 0, 65535);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
fail("Write did not fail");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// we should get an ioe
|
||||||
|
DFSClient.LOG.info("Got expected exception", ioe);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) { cluster.shutdown(); }
|
||||||
|
Mockito.when(faultInjector.corruptPacket()).thenReturn(false);
|
||||||
|
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check if DFS can handle corrupted CRC blocks
|
* check if DFS can handle corrupted CRC blocks
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user