HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream when the data length is small. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Walter Su 2015-08-27 09:09:52 +08:00
parent 6b6a63bbbd
commit 067ec8c2b1
12 changed files with 385 additions and 115 deletions

View File

@ -187,4 +187,10 @@ public boolean isShutdownInProgress() {
return shutdownInProgress.get(); return shutdownInProgress.get();
} }
/**
* clear all registered shutdownHooks.
*/
public void clearShutdownHooks() {
hooks.clear();
}
} }

View File

@ -400,3 +400,6 @@
HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the
blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz) blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz)
HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream
when the data length is small. (szetszwo via waltersu4549)

View File

@ -406,13 +406,13 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
if (currentPacket == null) { if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + LOG.debug("WriteChunk allocating new packet seqno=" +
currentPacket.getSeqno() + currentPacket.getSeqno() +
", src=" + src + ", src=" + src +
", packetSize=" + packetSize + ", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket + ", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + getStreamer().getBytesCurBlock()); ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this);
} }
} }

View File

@ -170,15 +170,18 @@ ExtendedBlock getBlockGroup() {
} }
final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
final ExtendedBlock block = new ExtendedBlock(b0); final ExtendedBlock block = new ExtendedBlock(b0);
long numBytes = b0.getNumBytes(); long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock();
for (int i = 1; i < numDataBlocks; i++) { for (int i = 1; i < numAllBlocks; i++) {
final StripedDataStreamer si = getStripedDataStreamer(i); final StripedDataStreamer si = getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock(); final ExtendedBlock bi = si.getBlock();
if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(bi.getGenerationStamp()); block.setGenerationStamp(bi.getGenerationStamp());
} }
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); if (i < numDataBlocks) {
numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
}
} }
block.setNumBytes(numBytes); block.setNumBytes(numBytes);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -318,8 +321,7 @@ private synchronized StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer)streamer; return (StripedDataStreamer)streamer;
} }
private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
throws IOException {
// backup currentPacket for current streamer // backup currentPacket for current streamer
int oldIdx = streamers.indexOf(streamer); int oldIdx = streamers.indexOf(streamer);
if (oldIdx >= 0) { if (oldIdx >= 0) {
@ -349,11 +351,11 @@ private static void encode(RawErasureEncoder encoder, int numData,
} }
private void checkStreamers() throws IOException { private void checkStreamers(boolean setExternalError) throws IOException {
int count = 0; int count = 0;
for(StripedDataStreamer s : streamers) { for(StripedDataStreamer s : streamers) {
if (!s.isFailed()) { if (!s.isFailed()) {
if (s.getBlock() != null) { if (setExternalError && s.getBlock() != null) {
s.getErrorState().initExternalError(); s.getErrorState().initExternalError();
} }
count++; count++;
@ -369,11 +371,16 @@ private void checkStreamers() throws IOException {
} }
} }
private void handleStreamerFailure(String err, private void handleStreamerFailure(String err, Exception e)
Exception e) throws IOException { throws IOException {
handleStreamerFailure(err, e, true);
}
private void handleStreamerFailure(String err, Exception e,
boolean setExternalError) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e); LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setFailed(true); getCurrentStreamer().setFailed(true);
checkStreamers(); checkStreamers(setExternalError);
currentPacket = null; currentPacket = null;
} }
@ -505,10 +512,10 @@ private long getCurrentSumBytes() {
return sum; return sum;
} }
private void writeParityCellsForLastStripe() throws IOException { private boolean generateParityCellsForLastStripe() {
final long currentBlockGroupBytes = getCurrentSumBytes(); final long currentBlockGroupBytes = getCurrentSumBytes();
if (currentBlockGroupBytes % stripeDataSize() == 0) { if (currentBlockGroupBytes % stripeDataSize() == 0) {
return; return false;
} }
final int firstCellSize = final int firstCellSize =
@ -530,8 +537,7 @@ private void writeParityCellsForLastStripe() throws IOException {
} }
buffers[i].flip(); buffers[i].flip();
} }
return true;
writeParityCells();
} }
void writeParityCells() throws IOException { void writeParityCells() throws IOException {
@ -603,12 +609,14 @@ protected synchronized void closeImpl() throws IOException {
// flush from all upper layers // flush from all upper layers
try { try {
flushBuffer(); flushBuffer();
// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();
enqueueAllCurrentPackets();
} catch(Exception e) { } catch(Exception e) {
handleStreamerFailure("closeImpl", e); handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e);
} }
// if the last stripe is incomplete, generate and write parity cells
if (generateParityCellsForLastStripe()) {
writeParityCells();
}
enqueueAllCurrentPackets();
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);
@ -620,7 +628,7 @@ protected synchronized void closeImpl() throws IOException {
// flush all data to Datanode // flush all data to Datanode
flushInternal(); flushInternal();
} catch(Exception e) { } catch(Exception e) {
handleStreamerFailure("closeImpl", e); handleStreamerFailure("flushInternal " + s, e, false);
} }
} }
} }
@ -643,9 +651,13 @@ protected synchronized void closeImpl() throws IOException {
private void enqueueAllCurrentPackets() throws IOException { private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer()); int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) { for(int i = 0; i < streamers.size(); i++) {
setCurrentStreamer(i); final StripedDataStreamer si = setCurrentStreamer(i);
if (currentPacket != null) { if (!si.isFailed() && currentPacket != null) {
enqueueCurrentPacket(); try {
enqueueCurrentPacket();
} catch (IOException e) {
handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false);
}
} }
} }
setCurrentStreamer(idx); setCurrentStreamer(idx);

View File

@ -173,7 +173,7 @@ private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam)
packets.clear(); packets.clear();
} }
static class LastExceptionInStreamer { class LastExceptionInStreamer {
private IOException thrown; private IOException thrown;
synchronized void set(Throwable t) { synchronized void set(Throwable t) {
@ -191,7 +191,8 @@ synchronized void check(boolean resetToNull) throws IOException {
if (thrown != null) { if (thrown != null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
// wrap and print the exception to know when the check is called // wrap and print the exception to know when the check is called
LOG.trace("Got Exception while checking", new Throwable(thrown)); LOG.trace("Got Exception while checking, " + DataStreamer.this,
new Throwable(thrown));
} }
final IOException e = thrown; final IOException e = thrown;
if (resetToNull) { if (resetToNull) {
@ -584,16 +585,13 @@ public void run() {
} }
// get new block from namenode. // get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block " + this);
}
setPipeline(nextBlockOutputStream()); setPipeline(nextBlockOutputStream());
initDataStreaming(); initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(LOG.isDebugEnabled()) {
LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery(); setupPipelineForAppendOrRecovery();
if (streamerClosed) { if (streamerClosed) {
continue; continue;
@ -639,8 +637,7 @@ public void run() {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block + LOG.debug(this + " sending " + one);
" sending packet " + one);
} }
// write out data to remote datanode // write out data to remote datanode
@ -1426,16 +1423,21 @@ static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
/** update pipeline at the namenode */ /** update pipeline at the namenode */
ExtendedBlock updatePipeline(long newGS) throws IOException { ExtendedBlock updatePipeline(long newGS) throws IOException {
final ExtendedBlock newBlock = newBlock(block, newGS); final ExtendedBlock newBlock = newBlock(block, newGS);
return callUpdatePipeline(block, newBlock); return callUpdatePipeline(block, newBlock, nodes, storageIDs);
} }
ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock) ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock,
DatanodeInfo[] newNodes, String[] newStorageIDs)
throws IOException { throws IOException {
dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
nodes, storageIDs); newNodes, newStorageIDs);
return newBlock; return newBlock;
} }
int getNumBlockWriteRetry() {
return dfsClient.getConf().getNumBlockWriteRetry();
}
/** /**
* Open a DataStreamer to a DataNode so that it can be written to. * Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated. * This happens when a file is created and each time a new block is allocated.
@ -1446,7 +1448,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb = null; LocatedBlock lb = null;
DatanodeInfo[] nodes = null; DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null; StorageType[] storageTypes = null;
int count = dfsClient.getConf().getNumBlockWriteRetry(); int count = getNumBlockWriteRetry();
boolean success = false; boolean success = false;
ExtendedBlock oldBlock = block; ExtendedBlock oldBlock = block;
do { do {
@ -1502,7 +1504,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
String firstBadLink = ""; String firstBadLink = "";
boolean checkRestart = false; boolean checkRestart = false;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("pipeline = " + Arrays.asList(nodes)); LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
} }
// persist blocks on namenode on next flush // persist blocks on namenode on next flush
@ -1574,7 +1576,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
errorState.reset(); errorState.reset();
} catch (IOException ie) { } catch (IOException ie) {
if (!errorState.isRestartingNode()) { if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream", ie); LOG.info("Exception in createBlockOutputStream " + this, ie);
} }
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
LOG.info("Will fetch a new encryption key and retry, " LOG.info("Will fetch a new encryption key and retry, "
@ -1649,7 +1651,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
} }
} }
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
final DfsClientConf conf = dfsClient.getConf(); final DfsClientConf conf = dfsClient.getConf();
int retries = conf.getNumBlockWriteLocateFollowingRetry(); int retries = conf.getNumBlockWriteLocateFollowingRetry();
@ -1755,6 +1757,10 @@ DatanodeInfo[] getNodes() {
return nodes; return nodes;
} }
String[] getStorageIDs() {
return storageIDs;
}
/** /**
* return the token of the block * return the token of the block
* *
@ -1933,7 +1939,6 @@ void closeSocket() throws IOException {
@Override @Override
public String toString() { public String toString() {
return (block == null? null: block.getLocalBlock()) return block == null? "block==null": "" + block.getLocalBlock();
+ "@" + Arrays.toString(getNodes());
} }
} }

View File

@ -23,6 +23,7 @@
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
@ -39,6 +40,8 @@
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This class extends {@link DataStreamer} to support writing striped blocks * This class extends {@link DataStreamer} to support writing striped blocks
* to datanodes. * to datanodes.
@ -58,13 +61,13 @@ public class StripedDataStreamer extends DataStreamer {
* @param <T> the queue entry type. * @param <T> the queue entry type.
*/ */
static abstract class ConcurrentPoll<T> { static abstract class ConcurrentPoll<T> {
private final MultipleBlockingQueue<T> queue; final MultipleBlockingQueue<T> queue;
ConcurrentPoll(MultipleBlockingQueue<T> queue) { ConcurrentPoll(MultipleBlockingQueue<T> queue) {
this.queue = queue; this.queue = queue;
} }
T poll(final int i) throws IOException { T poll(final int i) throws InterruptedIOException {
for(;;) { for(;;) {
synchronized(queue) { synchronized(queue) {
final T polled = queue.poll(i); final T polled = queue.poll(i);
@ -72,18 +75,17 @@ T poll(final int i) throws IOException {
return polled; return polled;
} }
if (isReady2Populate()) { if (isReady2Populate()) {
populate(); try {
return queue.poll(i); populate();
return queue.poll(i);
} catch(IOException ioe) {
LOG.warn("Failed to populate, " + this, ioe);
}
} }
} }
// sleep and then retry. // sleep and then retry.
try { sleep(100, "poll");
Thread.sleep(100);
} catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException(
"Sleep interrupted during poll", ie);
}
} }
} }
@ -94,6 +96,15 @@ boolean isReady2Populate() {
abstract void populate() throws IOException; abstract void populate() throws IOException;
} }
private static void sleep(long ms, String op) throws InterruptedIOException {
try {
Thread.sleep(ms);
} catch(InterruptedException ie) {
throw DFSUtil.toInterruptedIOException(
"Sleep interrupted during " + op, ie);
}
}
private final Coordinator coordinator; private final Coordinator coordinator;
private final int index; private final int index;
private volatile boolean failed; private volatile boolean failed;
@ -135,11 +146,14 @@ protected void endBlock() {
} }
@Override @Override
protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) int getNumBlockWriteRetry() {
return 0;
}
@Override
LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
final MultipleBlockingQueue<LocatedBlock> followingBlocks return new ConcurrentPoll<LocatedBlock>(coordinator.getFollowingBlocks()) {
= coordinator.getFollowingBlocks();
return new ConcurrentPoll<LocatedBlock>(followingBlocks) {
@Override @Override
boolean isReady2Populate() { boolean isReady2Populate() {
return super.isReady2Populate() return super.isReady2Populate()
@ -194,18 +208,24 @@ void populate() throws IOException {
si.endBlock(); si.endBlock();
si.close(true); si.close(true);
} else { } else {
followingBlocks.offer(i, blocks[i]); queue.offer(i, blocks[i]);
} }
} }
} }
}.poll(index); }.poll(index);
} }
@VisibleForTesting
LocatedBlock peekFollowingBlock() {
return coordinator.getFollowingBlocks().peek(index);
}
@Override @Override
LocatedBlock updateBlockForPipeline() throws IOException { LocatedBlock updateBlockForPipeline() throws IOException {
final MultipleBlockingQueue<LocatedBlock> newBlocks if (LOG.isDebugEnabled()) {
= coordinator.getNewBlocks(); LOG.debug("updateBlockForPipeline(), " + this);
return new ConcurrentPoll<LocatedBlock>(newBlocks) { }
return new ConcurrentPoll<LocatedBlock>(coordinator.getNewBlocks()) {
@Override @Override
void populate() throws IOException { void populate() throws IOException {
final ExtendedBlock bg = coordinator.getBlockGroup(); final ExtendedBlock bg = coordinator.getBlockGroup();
@ -224,10 +244,22 @@ void populate() throws IOException {
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
null, null, null, -1, updated.isCorrupt(), null); null, null, null, -1, updated.isCorrupt(), null);
lb.setBlockToken(updatedBlks[i].getBlockToken()); lb.setBlockToken(updatedBlks[i].getBlockToken());
newBlocks.offer(i, lb); queue.offer(i, lb);
} else { } else {
final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); final MultipleBlockingQueue<LocatedBlock> followingBlocks
lb.getBlock().setGenerationStamp(newGS); = coordinator.getFollowingBlocks();
synchronized(followingBlocks) {
final LocatedBlock lb = followingBlocks.peek(i);
if (lb != null) {
lb.getBlock().setGenerationStamp(newGS);
si.getErrorState().reset();
continue;
}
}
//streamer i just have polled the block, sleep and retry.
sleep(100, "updateBlockForPipeline, " + this);
i--;
} }
} }
} }
@ -236,21 +268,64 @@ void populate() throws IOException {
@Override @Override
ExtendedBlock updatePipeline(final long newGS) throws IOException { ExtendedBlock updatePipeline(final long newGS) throws IOException {
final MultipleBlockingQueue<ExtendedBlock> updateBlocks if (LOG.isDebugEnabled()) {
= coordinator.getUpdateBlocks(); LOG.debug("updatePipeline(newGS=" + newGS + "), " + this);
return new ConcurrentPoll<ExtendedBlock>(updateBlocks) { }
return new ConcurrentPoll<ExtendedBlock>(coordinator.getUpdateBlocks()) {
@Override @Override
void populate() throws IOException { void populate() throws IOException {
final MultipleBlockingQueue<LocatedBlock> followingBlocks
= coordinator.getFollowingBlocks();
final ExtendedBlock bg = coordinator.getBlockGroup(); final ExtendedBlock bg = coordinator.getBlockGroup();
final ExtendedBlock newBG = newBlock(bg, newGS); final ExtendedBlock newBG = newBlock(bg, newGS);
final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { final int n = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
StripedDataStreamer si = coordinator.getStripedDataStreamer(i); final DatanodeInfo[] newNodes = new DatanodeInfo[n];
if (si.isFailed()) { final String[] newStorageIDs = new String[n];
continue; // skipping failed data streamer for (int i = 0; i < n; i++) {
final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
DatanodeInfo[] nodes = si.getNodes();
String[] storageIDs = si.getStorageIDs();
if (nodes == null || storageIDs == null) {
synchronized(followingBlocks) {
final LocatedBlock lb = followingBlocks.peek(i);
if (lb != null) {
nodes = lb.getLocations();
storageIDs = lb.getStorageIDs();
}
}
} }
if (nodes != null && storageIDs != null) {
newNodes[i] = nodes[0];
newStorageIDs[i] = storageIDs[0];
} else {
//streamer i just have polled the block, sleep and retry.
sleep(100, "updatePipeline, " + this);
i--;
}
}
final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes,
newStorageIDs);
for (int i = 0; i < n; i++) {
final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
final ExtendedBlock bi = si.getBlock(); final ExtendedBlock bi = si.getBlock();
updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); if (bi != null) {
queue.offer(i, newBlock(bi, updated.getGenerationStamp()));
} else if (!si.isFailed()) {
synchronized(followingBlocks) {
final LocatedBlock lb = followingBlocks.peek(i);
if (lb != null) {
lb.getBlock().setGenerationStamp(newGS);
si.getErrorState().reset();
continue;
}
}
//streamer i just have polled the block, sleep and retry.
sleep(100, "updatePipeline, " + this);
i--;
}
} }
} }
}.poll(index); }.poll(index);
@ -258,7 +333,6 @@ void populate() throws IOException {
@Override @Override
public String toString() { public String toString() {
return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0) return "#" + index + ": " + (failed? "failed, ": "") + super.toString();
+ ", " + super.toString();
} }
} }

View File

@ -134,6 +134,9 @@ static void abandonBlock(
FSNamesystem fsn = fsd.getFSNamesystem(); FSNamesystem fsn = fsd.getFSNamesystem();
final INodeFile file = fsn.checkLease(src, holder, inode, fileId); final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
Preconditions.checkState(file.isUnderConstruction()); Preconditions.checkState(file.isUnderConstruction());
if (file.isStriped()) {
return; // do not abandon block for striped file
}
Block localBlock = ExtendedBlock.getLocalBlock(b); Block localBlock = ExtendedBlock.getLocalBlock(b);
fsd.writeLock(); fsd.writeLock();

View File

@ -120,6 +120,7 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -1867,6 +1868,7 @@ public void shutdown(boolean deleteDfsDir, boolean closeFileSystem) {
nameNode = null; nameNode = null;
} }
} }
ShutdownHookManager.get().clearShutdownHooks();
if (base_dir != null) { if (base_dir != null) {
if (deleteDfsDir) { if (deleteDfsDir) {
base_dir.delete(); base_dir.delete();

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -169,6 +168,7 @@ private byte getByte(long pos) {
} }
private void testOneFile(String src, int writeBytes) throws IOException { private void testOneFile(String src, int writeBytes) throws IOException {
src += "_" + writeBytes;
Path testPath = new Path(src); Path testPath = new Path(src);
byte[] bytes = generateBytes(writeBytes); byte[] bytes = generateBytes(writeBytes);

View File

@ -28,6 +28,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -48,6 +49,7 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -71,6 +73,38 @@ public class TestDFSStripedOutputStreamWithFailure {
private static final int FLUSH_POS private static final int FLUSH_POS
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
static {
System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS);
System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
System.out.println("CELL_SIZE = " + CELL_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(CELL_SIZE, "B", 2) + ")");
System.out.println("BLOCK_SIZE = " + BLOCK_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_SIZE, "B", 2) + ")");
System.out.println("BLOCK_GROUP_SIZE = " + BLOCK_GROUP_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_GROUP_SIZE, "B", 2) + ")");
}
static List<Integer> newLengths() {
final List<Integer> lengths = new ArrayList<>();
lengths.add(FLUSH_POS + 2);
for(int b = 0; b <= 2; b++) {
for(int c = 0; c < STRIPES_PER_BLOCK*NUM_DATA_BLOCKS; c++) {
for(int delta = -1; delta <= 1; delta++) {
final int length = b*BLOCK_GROUP_SIZE + c*CELL_SIZE + delta;
System.out.println(lengths.size() + ": length=" + length
+ ", (b, c, d) = (" + b + ", " + c + ", " + delta + ")");
lengths.add(length);
}
}
}
return lengths;
}
private static final List<Integer> LENGTHS = newLengths();
static int getLength(int i) {
return LENGTHS.get(i);
}
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem dfs; private DistributedFileSystem dfs;
@ -96,50 +130,49 @@ private static byte getByte(long pos) {
return (byte)pos; return (byte)pos;
} }
private void initConf(Configuration conf){ private HdfsConfiguration newHdfsConfiguration() {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
return conf;
} }
private void initConfWithBlockToken(Configuration conf) { void runTest(final int length) {
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); final HdfsConfiguration conf = newHdfsConfiguration();
conf.setInt("ipc.client.connect.max.retries", 0);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
}
@Test(timeout=240000)
public void testDatanodeFailure() throws Exception {
final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
HdfsConfiguration conf = new HdfsConfiguration();
initConf(conf);
for (int dn = 0; dn < 9; dn++) { for (int dn = 0; dn < 9; dn++) {
try { try {
setup(conf); setup(conf);
cluster.startDataNodes(conf, 1, true, null, null); runTest(length, dn, false, conf);
cluster.waitActive();
runTest(new Path(dir, "file" + dn), length, length / 2, dn, false);
} catch (Exception e) { } catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length); final String err = "failed, dn=" + dn + ", length=" + length
throw e; + StringUtils.stringifyException(e);
LOG.error(err);
Assert.fail(err);
} finally { } finally {
tearDown(); tearDown();
} }
} }
} }
@Test(timeout=240000)
public void testDatanodeFailure56() throws Exception {
runTest(getLength(56));
}
@Test(timeout=240000) @Test(timeout=240000)
public void testBlockTokenExpired() throws Exception { public void testBlockTokenExpired() throws Exception {
final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
HdfsConfiguration conf = new HdfsConfiguration(); final HdfsConfiguration conf = newHdfsConfiguration();
initConf(conf);
initConfWithBlockToken(conf); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
for (int dn = 0; dn < 9; dn += 2) { for (int dn = 0; dn < 9; dn += 2) {
try { try {
setup(conf); setup(conf);
cluster.startDataNodes(conf, 1, true, null, null); runTest(length, dn, true, conf);
cluster.waitActive();
runTest(new Path(dir, "file" + dn), length, length / 2, dn, true);
} catch (Exception e) { } catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length); LOG.error("failed, dn=" + dn + ", length=" + length);
throw e; throw e;
@ -229,19 +262,41 @@ public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException {
} }
} }
private void runTest(final Path p, final int length, final int killPos, private void runTest(final int length, final int dnIndex,
final int dnIndex, final boolean tokenExpire) throws Exception { final boolean tokenExpire, final HdfsConfiguration conf) {
LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos try {
+ ", dnIndex=" + dnIndex); runTest(length, length/2, dnIndex, tokenExpire, conf);
Preconditions.checkArgument(killPos < length); } catch(Exception e) {
Preconditions.checkArgument(killPos > FLUSH_POS); LOG.info("FAILED", e);
final String fullPath = p.toString(); Assert.fail(StringUtils.stringifyException(e));
}
}
private void runTest(final int length, final int killPos,
final int dnIndex, final boolean tokenExpire,
final HdfsConfiguration conf) throws Exception {
if (killPos <= FLUSH_POS) {
LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS
+ ", length=" + length + ", dnIndex=" + dnIndex);
return; //skip test
}
Preconditions.checkArgument(length > killPos,
"length=%s <= killPos=%s", length, killPos);
// start a datanode now, will kill one later
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive();
final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" + killPos);
final String fullPath = p.toString();
LOG.info("fullPath=" + fullPath);
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
if (tokenExpire) { if (tokenExpire) {
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
// set a short token lifetime (1 second) // set a short token lifetime (1 second)
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
} }
@ -265,7 +320,7 @@ private void runTest(final Path p, final int length, final int killPos,
waitTokenExpires(out); waitTokenExpires(out);
} }
StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos); killDatanode(cluster, stripedOut, dnIndex, pos);
killed = true; killed = true;
} }
@ -301,6 +356,40 @@ static long getGenerationStamp(DFSStripedOutputStream out)
} }
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
for(;;) {
DatanodeInfo[] datanodes = streamer.getNodes();
if (datanodes == null) {
// try peeking following block.
final LocatedBlock lb = streamer.peekFollowingBlock();
if (lb != null) {
datanodes = lb.getLocations();
}
}
if (datanodes != null) {
Assert.assertEquals(1, datanodes.length);
Assert.assertNotNull(datanodes[0]);
return datanodes[0];
}
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Assert.fail(StringUtils.stringifyException(ie));
return null;
}
}
}
static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
final int dnIndex, final AtomicInteger pos) {
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
final DatanodeInfo datanode = getDatanodes(s);
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
cluster.stopDataNode(datanode.getXferAddr());
}
static void checkData(DistributedFileSystem dfs, String src, int length, static void checkData(DistributedFileSystem dfs, String src, int length,
int killedDnIndex, long oldGS) throws IOException { int killedDnIndex, long oldGS) throws IOException {
List<List<LocatedBlock>> blockGroupList = new ArrayList<>(); List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
@ -314,7 +403,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length,
final long gs = firstBlock.getBlock().getGenerationStamp(); final long gs = firstBlock.getBlock().getGenerationStamp();
final String s = "gs=" + gs + ", oldGS=" + oldGS; final String s = "gs=" + gs + ", oldGS=" + oldGS;
LOG.info(s); LOG.info(s);
Assert.assertTrue(s, gs > oldGS); Assert.assertTrue(s, gs >= oldGS);
LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) firstBlock, (LocatedStripedBlock) firstBlock,
@ -342,7 +431,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length,
final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+ (j <= lastCellIndex? 1: 0); + (j <= lastCellIndex? 1: 0);
final int blockSize = numCellInBlock*CELL_SIZE final int blockSize = numCellInBlock*CELL_SIZE
+ (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0); + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0);
final byte[] blockBytes = new byte[blockSize]; final byte[] blockBytes = new byte[blockSize];
if (i < NUM_DATA_BLOCKS) { if (i < NUM_DATA_BLOCKS) {
@ -352,7 +441,8 @@ static void checkData(DistributedFileSystem dfs, String src, int length,
} }
final LocatedBlock lb = blockList.get(i); final LocatedBlock lb = blockList.get(i);
LOG.info("XXX i=" + i + ", lb=" + lb); LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
+ ", blockSize=" + blockSize + ", lb=" + lb);
if (lb == null) { if (lb == null) {
continue; continue;
} }
@ -410,4 +500,35 @@ private void waitTokenExpires(FSDataOutputStream out) throws IOException {
} }
} }
} }
public static abstract class TestBase {
static final long TIMEOUT = 240000;
int getBase() {
final String name = getClass().getSimpleName();
int i = name.length() - 1;
for(; i >= 0 && Character.isDigit(name.charAt(i)); i--);
return Integer.parseInt(name.substring(i + 1));
}
private final TestDFSStripedOutputStreamWithFailure test
= new TestDFSStripedOutputStreamWithFailure();
private void run(int offset) {
final int i = offset + getBase();
final int length = getLength(i);
System.out.println("Run test " + i + ", length=" + length);
test.runTest(length);
}
@Test(timeout=TIMEOUT) public void test0() {run(0);}
@Test(timeout=TIMEOUT) public void test1() {run(1);}
@Test(timeout=TIMEOUT) public void test2() {run(2);}
@Test(timeout=TIMEOUT) public void test3() {run(3);}
@Test(timeout=TIMEOUT) public void test4() {run(4);}
@Test(timeout=TIMEOUT) public void test5() {run(5);}
@Test(timeout=TIMEOUT) public void test6() {run(6);}
@Test(timeout=TIMEOUT) public void test7() {run(7);}
@Test(timeout=TIMEOUT) public void test8() {run(8);}
@Test(timeout=TIMEOUT) public void test9() {run(9);}
}
} }

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure.TestBase;
public class TestDFSStripedOutputStreamWithFailure000 extends TestBase {}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure.TestBase;
public class TestDFSStripedOutputStreamWithFailure010 extends TestBase {}