From 6419900ac24a5493827abf9b5d90373bc1043e0b Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 28 Sep 2015 14:40:27 -0700 Subject: [PATCH] HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su. --- .../hadoop/hdfs/protocol/DatanodeID.java | 2 + .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 62 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 603 +++++++++++++----- .../org/apache/hadoop/hdfs/DataStreamer.java | 214 +++---- .../hadoop/hdfs/StripedDataStreamer.java | 336 +++------- .../BlockUnderConstructionFeature.java | 30 +- .../blockmanagement/DatanodeManager.java | 4 + .../hadoop/hdfs/util/StripedBlockUtil.java | 23 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 31 +- .../hadoop/hdfs/StripedFileTestUtil.java | 213 ++++++- .../hdfs/TestDFSStripedOutputStream.java | 144 +---- ...TestDFSStripedOutputStreamWithFailure.java | 300 ++++----- .../hdfs/TestWriteStripedFileWithFailure.java | 8 + .../hdfs/server/balancer/TestBalancer.java | 4 +- .../hadoop/hdfs/server/mover/TestMover.java | 4 +- .../TestAddOverReplicatedStripedBlocks.java | 12 +- .../namenode/ha/TestRetryCacheWithHA.java | 9 +- 18 files changed, 1066 insertions(+), 936 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 6d72285426..c709cbdd47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -38,6 +38,8 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public class DatanodeID implements Comparable { public static final DatanodeID[] EMPTY_ARRAY = {}; + public static final DatanodeID EMPTY_DATANODE_ID = new DatanodeID("null", + "null", "null", 0, 0, 0, 0); private String ipAddr; // IP address private String hostName; // hostname claimed by datanode diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index d62dbacd4e..6a01d616f9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -450,3 +450,6 @@ HDFS-8882. Erasure Coding: Use datablocks, parityblocks and cell size from ErasureCodingPolicy (Vinayakumar B via zhz) + + HDFS-9040. Erasure coding: coordinate data streamers in + DFSStripedOutputStream. (jing9 and Walter Su) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4923c86f7f..e77a00a819 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.util.ByteArrayManager; @@ -212,14 +213,17 @@ public class DFSOutputStream extends FSOutputSummer /** Construct a new output stream for creating a file. */ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, - DataChecksum checksum, String[] favoredNodes) throws IOException { + DataChecksum checksum, String[] favoredNodes, boolean createStreamer) + throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); - streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager, favoredNodes); + if (createStreamer) { + streamer = new DataStreamer(stat, null, dfsClient, src, progress, + checksum, cachingStrategy, byteArrayManager, favoredNodes); + } } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, @@ -276,7 +280,7 @@ public class DFSOutputStream extends FSOutputSummer flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); + flag, progress, checksum, favoredNodes, true); } out.start(); return out; @@ -476,7 +480,7 @@ public class DFSOutputStream extends FSOutputSummer * * @throws IOException */ - protected void endBlock() throws IOException { + void endBlock() throws IOException { if (getStreamer().getBytesCurBlock() == blockSize) { setCurrentPacketToEmpty(); enqueueCurrentPacket(); @@ -921,4 +925,52 @@ public class DFSOutputStream extends FSOutputSummer public String toString() { return getClass().getSimpleName() + ":" + streamer; } + + static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, + String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes) + throws IOException { + final DfsClientConf conf = dfsClient.getConf(); + int retries = conf.getNumBlockWriteLocateFollowingRetry(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); + long localstart = Time.monotonicNow(); + while (true) { + try { + return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, + excludedNodes, fileId, favoredNodes); + } catch (RemoteException e) { + IOException ue = e.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class, + NSQuotaExceededException.class, + DSQuotaExceededException.class, + QuotaByStorageTypeExceededException.class, + UnresolvedPathException.class); + if (ue != e) { + throw ue; // no need to retry these exceptions + } + if (NotReplicatedYetException.class.getName().equals(e.getClassName())) { + if (retries == 0) { + throw e; + } else { + --retries; + LOG.info("Exception while adding a block", e); + long elapsed = Time.monotonicNow() - localstart; + if (elapsed > 5000) { + LOG.info("Waiting for replication for " + (elapsed / 1000) + + " seconds"); + } + try { + LOG.warn("NotReplicatedYetException sleeping " + src + + " retries left " + retries); + Thread.sleep(sleeptime); + sleeptime *= 2; + } catch (InterruptedException ie) { + LOG.warn("Caught exception", ie); + } + } + } else { + throw e; + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index d3a054affe..c145a2a1fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -25,23 +25,34 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -59,23 +70,11 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final List> queues; MultipleBlockingQueue(int numQueue, int queueSize) { - queues = new ArrayList<>(numQueue); + List> list = new ArrayList<>(numQueue); for (int i = 0; i < numQueue; i++) { - queues.add(new LinkedBlockingQueue(queueSize)); + list.add(new LinkedBlockingQueue(queueSize)); } - } - - boolean isEmpty() { - for(int i = 0; i < queues.size(); i++) { - if (!queues.get(i).isEmpty()) { - return false; - } - } - return true; - } - - int numQueues() { - return queues.size(); + queues = Collections.synchronizedList(list); } void offer(int i, T object) { @@ -92,6 +91,14 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } + T takeWithTimeout(int i) throws InterruptedIOException { + try { + return queues.get(i).poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, e); + } + } + T poll(int i) { return queues.get(i).poll(); } @@ -99,23 +106,44 @@ public class DFSStripedOutputStream extends DFSOutputStream { T peek(int i) { return queues.get(i).peek(); } + + void clear() { + for (BlockingQueue q : queues) { + q.clear(); + } + } } /** Coordinate the communication between the streamers. */ - class Coordinator { + static class Coordinator { + /** + * The next internal block to write to for each streamers. The + * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to + * get a new block group. The block group is split to internal blocks, which + * are then distributed into the queue for streamers to retrieve. + */ private final MultipleBlockingQueue followingBlocks; + /** + * Used to sync among all the streamers before allocating a new block. The + * DFSStripedOutputStream uses this to make sure every streamer has finished + * writing the previous block. + */ private final MultipleBlockingQueue endBlocks; + /** + * The following data structures are used for syncing while handling errors + */ private final MultipleBlockingQueue newBlocks; - private final MultipleBlockingQueue updateBlocks; + private final Map updateStreamerMap; + private final MultipleBlockingQueue streamerUpdateResult; - Coordinator(final DfsClientConf conf, final int numDataBlocks, - final int numAllBlocks) { + Coordinator(final int numAllBlocks) { followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1); - + endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); + updateStreamerMap = Collections.synchronizedMap( + new HashMap(numAllBlocks)); + streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); } MultipleBlockingQueue getFollowingBlocks() { @@ -126,68 +154,28 @@ public class DFSStripedOutputStream extends DFSOutputStream { return newBlocks; } - MultipleBlockingQueue getUpdateBlocks() { - return updateBlocks; - } - - StripedDataStreamer getStripedDataStreamer(int i) { - return DFSStripedOutputStream.this.getStripedDataStreamer(i); - } - void offerEndBlock(int i, ExtendedBlock block) { endBlocks.offer(i, block); } - ExtendedBlock takeEndBlock(int i) throws InterruptedIOException { - return endBlocks.take(i); + void offerStreamerUpdateResult(int i, boolean success) { + streamerUpdateResult.offer(i, success); } - boolean hasAllEndBlocks() { - for(int i = 0; i < endBlocks.numQueues(); i++) { - if (endBlocks.peek(i) == null) { - return false; - } - } - return true; + boolean takeStreamerUpdateResult(int i) throws InterruptedIOException { + return streamerUpdateResult.take(i); } - void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { - ExtendedBlock b = endBlocks.peek(i); - if (b == null) { - // streamer just has failed, put end block and continue - b = block; - offerEndBlock(i, b); - } - b.setNumBytes(newBytes); + void updateStreamer(StripedDataStreamer streamer, + boolean success) { + assert !updateStreamerMap.containsKey(streamer); + updateStreamerMap.put(streamer, success); } - /** @return a block representing the entire block group. */ - ExtendedBlock getBlockGroup() { - final StripedDataStreamer s0 = getStripedDataStreamer(0); - final ExtendedBlock b0 = s0.getBlock(); - if (b0 == null) { - return null; - } - - final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; - - final ExtendedBlock block = new ExtendedBlock(b0); - long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock(); - for (int i = 1; i < numAllBlocks; i++) { - final StripedDataStreamer si = getStripedDataStreamer(i); - final ExtendedBlock bi = si.getBlock(); - if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(bi.getGenerationStamp()); - } - if (i < numDataBlocks) { - numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); - } - } - block.setNumBytes(numBytes); - if (LOG.isDebugEnabled()) { - LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes()); - } - return block; + void clearFailureStates() { + newBlocks.clear(); + updateStreamerMap.clear(); + streamerUpdateResult.clear(); } } @@ -263,18 +251,16 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final int cellSize; private final int numAllBlocks; private final int numDataBlocks; - - @Override - ExtendedBlock getBlock() { - return coordinator.getBlockGroup(); - } + private ExtendedBlock currentBlockGroup; + private final String[] favoredNodes; + private final List failedStreamers; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { - super(dfsClient, src, stat, flag, progress, checksum, favoredNodes); + super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false); if (LOG.isDebugEnabled()) { LOG.debug("Creating DFSStripedOutputStream for " + src); } @@ -284,12 +270,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellSize = ecPolicy.getCellSize(); numDataBlocks = ecPolicy.getNumDataUnits(); numAllBlocks = numDataBlocks + numParityBlocks; + this.favoredNodes = favoredNodes; + failedStreamers = new ArrayList<>(); encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), numDataBlocks, numParityBlocks); - coordinator = new Coordinator(dfsClient.getConf(), - numDataBlocks, numAllBlocks); + coordinator = new Coordinator(numAllBlocks); try { cellBuffers = new CellBuffers(numParityBlocks); } catch (InterruptedException ie) { @@ -297,14 +284,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { "Failed to create cell buffers", ie); } - List s = new ArrayList<>(numAllBlocks); + streamers = new ArrayList<>(numAllBlocks); for (short i = 0; i < numAllBlocks; i++) { StripedDataStreamer streamer = new StripedDataStreamer(stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, i, coordinator); - s.add(streamer); + streamers.add(streamer); } - streamers = Collections.unmodifiableList(s); currentPackets = new DFSPacket[streamers.size()]; setCurrentStreamer(0); } @@ -318,17 +304,19 @@ public class DFSStripedOutputStream extends DFSOutputStream { } private synchronized StripedDataStreamer getCurrentStreamer() { - return (StripedDataStreamer)streamer; + return (StripedDataStreamer) streamer; } private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) { // backup currentPacket for current streamer - int oldIdx = streamers.indexOf(streamer); - if (oldIdx >= 0) { - currentPackets[oldIdx] = currentPacket; + if (streamer != null) { + int oldIdx = streamers.indexOf(getCurrentStreamer()); + if (oldIdx >= 0) { + currentPackets[oldIdx] = currentPacket; + } } - streamer = streamers.get(newIdx); + streamer = getStripedDataStreamer(newIdx); currentPacket = currentPackets[newIdx]; adjustChunkBoundary(); @@ -350,38 +338,125 @@ public class DFSStripedOutputStream extends DFSOutputStream { encoder.encode(dataBuffers, parityBuffers); } - - private void checkStreamers(boolean setExternalError) throws IOException { - int count = 0; + /** + * check all the existing StripedDataStreamer and find newly failed streamers. + * @return The newly failed streamers. + * @throws IOException if less than {@link #numDataBlocks} streamers are still + * healthy. + */ + private Set checkStreamers() throws IOException { + Set newFailed = new HashSet<>(); for(StripedDataStreamer s : streamers) { - if (!s.isFailed()) { - if (setExternalError && s.getBlock() != null) { - s.getErrorState().initExternalError(); - } - count++; + if (!s.isHealthy() && !failedStreamers.contains(s)) { + newFailed.add(s); } } + + final int failCount = failedStreamers.size() + newFailed.size(); if (LOG.isDebugEnabled()) { LOG.debug("checkStreamers: " + streamers); - LOG.debug("count=" + count); + LOG.debug("healthy streamer count=" + (numAllBlocks - failCount)); + LOG.debug("original failed streamers: " + failedStreamers); + LOG.debug("newly failed streamers: " + newFailed); } - if (count < numDataBlocks) { - throw new IOException("Failed: the number of remaining blocks = " - + count + " < the number of data blocks = " + numDataBlocks); + if (failCount > (numAllBlocks - numDataBlocks)) { + throw new IOException("Failed: the number of failed blocks = " + + failCount + " > the number of data blocks = " + + (numAllBlocks - numDataBlocks)); } + return newFailed; } private void handleStreamerFailure(String err, Exception e) throws IOException { - handleStreamerFailure(err, e, true); + LOG.warn("Failed: " + err + ", " + this, e); + getCurrentStreamer().getErrorState().setInternalError(); + getCurrentStreamer().close(true); + checkStreamers(); + currentPacket = null; } - private void handleStreamerFailure(String err, Exception e, - boolean setExternalError) throws IOException { - LOG.warn("Failed: " + err + ", " + this, e); - getCurrentStreamer().setFailed(true); - checkStreamers(setExternalError); - currentPacket = null; + private void replaceFailedStreamers() { + assert streamers.size() == numAllBlocks; + for (short i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer oldStreamer = getStripedDataStreamer(i); + if (!oldStreamer.isHealthy()) { + StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat, + dfsClient, src, oldStreamer.progress, + oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager, + favoredNodes, i, coordinator); + streamers.set(i, streamer); + currentPackets[i] = null; + if (i == 0) { + this.streamer = streamer; + } + streamer.start(); + } + } + } + + private void waitEndBlocks(int i) throws IOException { + while (getStripedDataStreamer(i).isHealthy()) { + final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i); + if (b != null) { + StripedBlockUtil.checkBlocks(currentBlockGroup, i, b); + return; + } + } + } + + private void allocateNewBlock() throws IOException { + if (currentBlockGroup != null) { + for (int i = 0; i < numAllBlocks; i++) { + // sync all the healthy streamers before writing to the new block + waitEndBlocks(i); + } + } + failedStreamers.clear(); + // replace failed streamers + replaceFailedStreamers(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Allocating new block group. The previous block group: " + + currentBlockGroup); + } + + // TODO collect excludedNodes from all the data streamers + final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup, + fileId, favoredNodes); + assert lb.isStriped(); + if (lb.getLocations().length < numDataBlocks) { + throw new IOException("Failed to get " + numDataBlocks + + " nodes from namenode: blockGroupSize= " + numAllBlocks + + ", blocks.length= " + lb.getLocations().length); + } + // assign the new block to the current block group + currentBlockGroup = lb.getBlock(); + + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) lb, cellSize, numDataBlocks, + numAllBlocks - numDataBlocks); + for (int i = 0; i < blocks.length; i++) { + StripedDataStreamer si = getStripedDataStreamer(i); + if (si.isHealthy()) { // skipping failed data streamer + if (blocks[i] == null) { + // Set exception and close streamer as there is no block locations + // found for the parity block. + LOG.warn("Failed to get block location for parity block, index=" + i); + si.getLastException().set( + new IOException("Failed to get following block, i=" + i)); + si.getErrorState().setInternalError(); + si.close(true); + } else { + coordinator.getFollowingBlocks().offer(i, blocks[i]); + } + } + } + } + + private boolean shouldEndBlockGroup() { + return currentBlockGroup != null && + currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; } @Override @@ -392,8 +467,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { final int pos = cellBuffers.addTo(index, bytes, offset, len); final boolean cellFull = pos == cellSize; - final long oldBytes = current.getBytesCurBlock(); - if (!current.isFailed()) { + if (currentBlockGroup == null || shouldEndBlockGroup()) { + // the incoming data should belong to a new block. Allocate a new block. + allocateNewBlock(); + } + + currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len); + if (current.isHealthy()) { try { super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); } catch(Exception e) { @@ -401,12 +481,6 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - if (current.isFailed()) { - final long newBytes = oldBytes + len; - coordinator.setBytesEndBlock(index, newBytes, current.getBlock()); - current.setBytesCurBlock(newBytes); - } - // Two extra steps are needed when a striping cell is full: // 1. Forward the current index pointer // 2. Generate parity packets if a full stripe of data cells are present @@ -419,11 +493,209 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellBuffers.flipDataBuffers(); writeParityCells(); next = 0; + // check failure state for all the streamers. Bump GS if necessary + checkStreamerFailures(); + + // if this is the end of the block group, end each internal block + if (shouldEndBlockGroup()) { + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (s.isHealthy()) { + try { + endBlock(); + } catch (IOException ignored) {} + } + } + } } setCurrentStreamer(next); } } + @Override + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + + " appendChunk={}, {}", currentPacket, src, getStreamer() + .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + // no need to end block here + } + + private Set markExternalErrorOnStreamers() { + Set healthySet = new HashSet<>(); + for (StripedDataStreamer streamer : streamers) { + if (streamer.isHealthy() && + streamer.getStage() == BlockConstructionStage.DATA_STREAMING) { + streamer.setExternalError(); + healthySet.add(streamer); + } + } + return healthySet; + } + + /** + * Check and handle data streamer failures. This is called only when we have + * written a full stripe (i.e., enqueue all packets for a full stripe), or + * when we're closing the outputstream. + */ + private void checkStreamerFailures() throws IOException { + Set newFailed = checkStreamers(); + if (newFailed.size() > 0) { + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + } + // get all the current failed streamers after the flush + newFailed = checkStreamers(); + while (newFailed.size() > 0) { + failedStreamers.addAll(newFailed); + coordinator.clearFailureStates(); + + // mark all the healthy streamers as external error + Set healthySet = markExternalErrorOnStreamers(); + + // we have newly failed streamers, update block for pipeline + final ExtendedBlock newBG = updateBlockForPipeline(healthySet); + + // wait till all the healthy streamers to + // 1) get the updated block info + // 2) create new block outputstream + newFailed = waitCreatingNewStreams(healthySet); + if (newFailed.size() + failedStreamers.size() > + numAllBlocks - numDataBlocks) { + throw new IOException( + "Data streamers failed while creating new block streams: " + + newFailed + ". There are not enough healthy streamers."); + } + for (StripedDataStreamer failedStreamer : newFailed) { + assert !failedStreamer.isHealthy(); + } + + // TODO we can also succeed if all the failed streamers have not taken + // the updated block + if (newFailed.size() == 0) { + // reset external error state of all the streamers + for (StripedDataStreamer streamer : healthySet) { + assert streamer.isHealthy(); + streamer.getErrorState().reset(); + } + updatePipeline(newBG); + } + for (int i = 0; i < numAllBlocks; i++) { + coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); + } + } + } + + private int checkStreamerUpdates(Set failed, + Set streamers) { + for (StripedDataStreamer streamer : streamers) { + if (!coordinator.updateStreamerMap.containsKey(streamer)) { + if (!streamer.isHealthy() && + coordinator.getNewBlocks().peek(streamer.getIndex()) != null) { + // this streamer had internal error before getting updated block + failed.add(streamer); + } + } + } + return coordinator.updateStreamerMap.size() + failed.size(); + } + + private Set waitCreatingNewStreams( + Set healthyStreamers) throws IOException { + Set failed = new HashSet<>(); + final int expectedNum = healthyStreamers.size(); + final long socketTimeout = dfsClient.getConf().getSocketTimeout(); + // the total wait time should be less than the socket timeout, otherwise + // a slow streamer may cause other streamers to timeout. here we wait for + // half of the socket timeout + long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE; + final long waitInterval = 1000; + synchronized (coordinator) { + while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum + && remaingTime > 0) { + try { + long start = Time.monotonicNow(); + coordinator.wait(waitInterval); + remaingTime -= Time.monotonicNow() - start; + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("Interrupted when waiting" + + " for results of updating striped streamers", e); + } + } + } + synchronized (coordinator) { + for (StripedDataStreamer streamer : healthyStreamers) { + if (!coordinator.updateStreamerMap.containsKey(streamer)) { + // close the streamer if it is too slow to create new connection + streamer.setStreamerAsClosed(); + failed.add(streamer); + } + } + } + for (Map.Entry entry : + coordinator.updateStreamerMap.entrySet()) { + if (!entry.getValue()) { + failed.add(entry.getKey()); + } + } + for (StripedDataStreamer failedStreamer : failed) { + healthyStreamers.remove(failedStreamer); + } + return failed; + } + + /** + * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block + * to healthy streamers. + * @param healthyStreamers The healthy data streamers. These streamers join + * the failure handling. + */ + private ExtendedBlock updateBlockForPipeline( + Set healthyStreamers) throws IOException { + final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline( + currentBlockGroup, dfsClient.clientName); + final long newGS = updated.getBlock().getGenerationStamp(); + ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup); + newBlock.setGenerationStamp(newGS); + final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) updated, cellSize, numDataBlocks, + numAllBlocks - numDataBlocks); + + for (int i = 0; i < numAllBlocks; i++) { + StripedDataStreamer si = getStripedDataStreamer(i); + if (healthyStreamers.contains(si)) { + final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock), + null, null, null, -1, updated.isCorrupt(), null); + lb.setBlockToken(updatedBlks[i].getBlockToken()); + coordinator.getNewBlocks().offer(i, lb); + } + } + return newBlock; + } + + private void updatePipeline(ExtendedBlock newBG) throws IOException { + final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks]; + final String[] newStorageIDs = new String[numAllBlocks]; + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer streamer = getStripedDataStreamer(i); + final DatanodeInfo[] nodes = streamer.getNodes(); + final String[] storageIDs = streamer.getStorageIDs(); + if (streamer.isHealthy() && nodes != null && storageIDs != null) { + newNodes[i] = nodes[0]; + newStorageIDs[i] = storageIDs[0]; + } else { + newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID); + newStorageIDs[i] = ""; + } + } + dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup, + newBG, newNodes, newStorageIDs); + currentBlockGroup = newBG; + } + private int stripeDataSize() { return numDataBlocks * cellSize; } @@ -500,28 +772,16 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - /** - * Simply add bytesCurBlock together. Note that this result is not accurately - * the size of the block group. - */ - private long getCurrentSumBytes() { - long sum = 0; - for (int i = 0; i < numDataBlocks; i++) { - sum += streamers.get(i).getBytesCurBlock(); - } - return sum; - } - private boolean generateParityCellsForLastStripe() { - final long currentBlockGroupBytes = getCurrentSumBytes(); - if (currentBlockGroupBytes % stripeDataSize() == 0) { + final long currentBlockGroupBytes = currentBlockGroup == null ? + 0 : currentBlockGroup.getNumBytes(); + final long lastStripeSize = currentBlockGroupBytes % stripeDataSize(); + if (lastStripeSize == 0) { return false; } - final int firstCellSize = - (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); - final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? - firstCellSize : cellSize; + final long parityCellSize = lastStripeSize < cellSize? + lastStripeSize : cellSize; final ByteBuffer[] buffers = cellBuffers.getBuffers(); for (int i = 0; i < numAllBlocks; i++) { @@ -550,13 +810,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellBuffers.clear(); } - void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf - ) throws IOException { + void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf) + throws IOException { final StripedDataStreamer current = setCurrentStreamer(index); final int len = buffer.limit(); final long oldBytes = current.getBytesCurBlock(); - if (!current.isFailed()) { + if (current.isHealthy()) { try { DataChecksum sum = getDataChecksum(); sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); @@ -570,18 +830,13 @@ public class DFSStripedOutputStream extends DFSOutputStream { handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); } } - - if (current.isFailed()) { - final long newBytes = oldBytes + len; - current.setBytesCurBlock(newBytes); - } } @Override void setClosed() { super.setClosed(); for (int i = 0; i < numAllBlocks; i++) { - streamers.get(i).release(); + getStripedDataStreamer(i).release(); } cellBuffers.release(); } @@ -607,37 +862,40 @@ public class DFSStripedOutputStream extends DFSOutputStream { try { // flush from all upper layers - try { - flushBuffer(); - } catch(Exception e) { - handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e); - } + flushBuffer(); // if the last stripe is incomplete, generate and write parity cells if (generateParityCellsForLastStripe()) { writeParityCells(); } enqueueAllCurrentPackets(); + // flush all the data packets + flushAllInternals(); + // check failures + checkStreamerFailures(); + for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); - if (!s.isFailed()) { + if (s.isHealthy()) { try { if (s.getBytesCurBlock() > 0) { setCurrentPacketToEmpty(); } - // flush all data to Datanode + // flush the last "close" packet to Datanode flushInternal(); } catch(Exception e) { - handleStreamerFailure("flushInternal " + s, e, false); + // TODO for both close and endBlock, we currently do not handle + // failures when sending the last packet. We actually do not need to + // bump GS for this kind of failure. Thus counting the total number + // of failures may be good enough. } } } closeThreads(false); - final ExtendedBlock lastBlock = coordinator.getBlockGroup(); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { - completeFile(lastBlock); + completeFile(currentBlockGroup); } finally { scope.close(); } @@ -652,14 +910,45 @@ public class DFSStripedOutputStream extends DFSOutputStream { int idx = streamers.indexOf(getCurrentStreamer()); for(int i = 0; i < streamers.size(); i++) { final StripedDataStreamer si = setCurrentStreamer(i); - if (!si.isFailed() && currentPacket != null) { + if (si.isHealthy() && currentPacket != null) { try { enqueueCurrentPacket(); } catch (IOException e) { - handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false); + handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e); } } } setCurrentStreamer(idx); } + + void flushAllInternals() throws IOException { + int current = getCurrentIndex(); + + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (s.isHealthy()) { + try { + // flush all data to Datanode + flushInternal(); + } catch(Exception e) { + handleStreamerFailure("flushInternal " + s, e); + } + } + } + setCurrentStreamer(current); + } + + static void sleep(long ms, String op) throws InterruptedIOException { + try { + Thread.sleep(ms); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Sleep interrupted during " + op, ie); + } + } + + @Override + ExtendedBlock getBlock() { + return currentBlockGroup; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index c478f1c3f7..a6eb01f989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SU import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; @@ -46,16 +45,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; @@ -69,13 +64,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; @@ -204,9 +196,12 @@ class DataStreamer extends Daemon { } } + enum ErrorType { + NONE, INTERNAL, EXTERNAL + } + static class ErrorState { - private boolean error = false; - private boolean externalError = false; + ErrorType error = ErrorType.NONE; private int badNodeIndex = -1; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; @@ -216,36 +211,48 @@ class DataStreamer extends Daemon { this.datanodeRestartTimeout = datanodeRestartTimeout; } - synchronized void reset() { - error = false; - externalError = false; + synchronized void resetInternalError() { + if (hasInternalError()) { + error = ErrorType.NONE; + } badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; } - synchronized boolean hasError() { - return error; + synchronized void reset() { + error = ErrorType.NONE; + badNodeIndex = -1; + restartingNodeIndex = -1; + restartingNodeDeadline = 0; } - synchronized boolean hasExternalErrorOnly() { - return error && externalError && !isNodeMarked(); + synchronized boolean hasInternalError() { + return error == ErrorType.INTERNAL; + } + + synchronized boolean hasExternalError() { + return error == ErrorType.EXTERNAL; + } + + synchronized boolean hasError() { + return error != ErrorType.NONE; } synchronized boolean hasDatanodeError() { - return error && (isNodeMarked() || externalError); + return error == ErrorType.INTERNAL && isNodeMarked(); } - synchronized void setError(boolean err) { - this.error = err; + synchronized void setInternalError() { + this.error = ErrorType.INTERNAL; } - synchronized void initExternalError() { - setError(true); - this.externalError = true; + synchronized void setExternalError() { + if (!hasInternalError()) { + this.error = ErrorType.EXTERNAL; + } } - synchronized void setBadNodeIndex(int index) { this.badNodeIndex = index; } @@ -306,14 +313,14 @@ class DataStreamer extends Daemon { } if (!isRestartingNode()) { - error = false; + error = ErrorType.NONE; } badNodeIndex = -1; } synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { if (restartingNodeIndex >= 0) { - if (!error) { + if (error == ErrorType.NONE) { throw new IllegalStateException("error=false while checking" + " restarting node deadline"); } @@ -345,7 +352,7 @@ class DataStreamer extends Daemon { private volatile boolean streamerClosed = false; protected ExtendedBlock block; // its length is number of bytes acked - private Token accessToken; + protected Token accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; private ResponseProcessor response = null; @@ -355,7 +362,7 @@ class DataStreamer extends Daemon { private final ErrorState errorState; private BlockConstructionStage stage; // block construction stage - private long bytesSent = 0; // number of bytes that've been sent + protected long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; /** Nodes have been used in the pipeline before and have failed. */ @@ -378,13 +385,13 @@ class DataStreamer extends Daemon { protected final DFSClient dfsClient; protected final String src; /** Only for DataTransferProtocol.writeBlock(..) */ - private final DataChecksum checksum4WriteBlock; - private final Progressable progress; + final DataChecksum checksum4WriteBlock; + final Progressable progress; protected final HdfsFileStatus stat; // appending to existing partial block private volatile boolean appendChunk = false; // both dataQueue and ackQueue are protected by dataQueue lock - private final LinkedList dataQueue = new LinkedList<>(); + protected final LinkedList dataQueue = new LinkedList<>(); private final LinkedList ackQueue = new LinkedList<>(); private final AtomicReference cachingStrategy; private final ByteArrayManager byteArrayManager; @@ -401,7 +408,7 @@ class DataStreamer extends Daemon { CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; private int lastCongestionBackoffTime; - private final LoadingCache excludedNodes; + protected final LoadingCache excludedNodes; private final String[] favoredNodes; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, @@ -473,6 +480,10 @@ class DataStreamer extends Daemon { } } + void setAccessToken(Token t) { + this.accessToken = t; + } + private void setPipeline(LocatedBlock lb) { setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } @@ -533,7 +544,7 @@ class DataStreamer extends Daemon { DFSPacket one; try { // process datanode IO errors if any - boolean doSleep = processDatanodeError(); + boolean doSleep = processDatanodeOrExternalError(); final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { @@ -696,7 +707,7 @@ class DataStreamer extends Daemon { } lastException.set(e); assert !(e instanceof NullPointerException); - errorState.setError(true); + errorState.setInternalError(); if (!errorState.isNodeMarked()) { // Not a datanode issue streamerClosed = true; @@ -837,6 +848,9 @@ class DataStreamer extends Daemon { } } + void setStreamerAsClosed() { + streamerClosed = true; + } private void checkClosed() throws IOException { if (streamerClosed) { @@ -857,7 +871,7 @@ class DataStreamer extends Daemon { } } - private void closeStream() { + void closeStream() { final MultipleIOException.Builder b = new MultipleIOException.Builder(); if (blockStream != null) { @@ -1037,7 +1051,7 @@ class DataStreamer extends Daemon { } catch (Exception e) { if (!responderClosed) { lastException.set(e); - errorState.setError(true); + errorState.setInternalError(); errorState.markFirstNodeIfNotMarked(); synchronized (dataQueue) { dataQueue.notifyAll(); @@ -1059,18 +1073,18 @@ class DataStreamer extends Daemon { } } + private boolean shouldHandleExternalError(){ + return errorState.hasExternalError() && blockStream != null; + } + /** * If this stream has encountered any errors, shutdown threads * and mark the stream as closed. * * @return true if it should sleep for a while after returning. */ - private boolean processDatanodeError() throws IOException { - if (!errorState.hasDatanodeError()) { - return false; - } - if (errorState.hasExternalErrorOnly() && block == null) { - // block is not yet initialized, handle external error later. + private boolean processDatanodeOrExternalError() throws IOException { + if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) { return false; } if (response != null) { @@ -1103,7 +1117,8 @@ class DataStreamer extends Daemon { return false; } } - boolean doSleep = setupPipelineForAppendOrRecovery(); + + setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { @@ -1135,7 +1150,7 @@ class DataStreamer extends Daemon { } } - return doSleep; + return false; } void setHflush() { @@ -1266,7 +1281,7 @@ class DataStreamer extends Daemon { * This happens when a file is appended or data streaming fails * It keeps on trying until a pipeline is setup */ - private boolean setupPipelineForAppendOrRecovery() throws IOException { + private void setupPipelineForAppendOrRecovery() throws IOException { // check number of datanodes if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" @@ -1274,19 +1289,23 @@ class DataStreamer extends Daemon { LOG.warn(msg); lastException.set(new IOException(msg)); streamerClosed = true; - return false; + return; } + setupPipelineInternal(nodes, storageTypes); + } + protected void setupPipelineInternal(DatanodeInfo[] datanodes, + StorageType[] nodeStorageTypes) throws IOException { boolean success = false; long newGS = 0L; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) { - return false; + return; } - final boolean isRecovery = errorState.hasError(); + final boolean isRecovery = errorState.hasInternalError(); if (!handleBadDatanode()) { - return false; + return; } handleDatanodeReplacement(); @@ -1307,7 +1326,6 @@ class DataStreamer extends Daemon { if (success) { block = updatePipeline(newGS); } - return false; // do not sleep, continue processing } /** @@ -1315,7 +1333,7 @@ class DataStreamer extends Daemon { * This process is repeated until the deadline or the node starts back up. * @return true if it should continue. */ - private boolean handleRestartingDatanode() { + boolean handleRestartingDatanode() { if (errorState.isRestartingNode()) { // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this @@ -1338,7 +1356,7 @@ class DataStreamer extends Daemon { * Remove bad node from list of nodes if badNodeIndex was set. * @return true if it should continue. */ - private boolean handleBadDatanode() { + boolean handleBadDatanode() { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0) { if (nodes.length <= 1) { @@ -1388,7 +1406,7 @@ class DataStreamer extends Daemon { } } - private void failPacket4Testing() { + void failPacket4Testing() { if (failPacket) { // for testing failPacket = false; try { @@ -1400,13 +1418,8 @@ class DataStreamer extends Daemon { } } - LocatedBlock updateBlockForPipeline() throws IOException { - return callUpdateBlockForPipeline(block); - } - - LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException { - return dfsClient.namenode.updateBlockForPipeline( - newBlock, dfsClient.clientName); + private LocatedBlock updateBlockForPipeline() throws IOException { + return dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); } static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { @@ -1417,18 +1430,12 @@ class DataStreamer extends Daemon { /** update pipeline at the namenode */ ExtendedBlock updatePipeline(long newGS) throws IOException { final ExtendedBlock newBlock = newBlock(block, newGS); - return callUpdatePipeline(block, newBlock, nodes, storageIDs); - } - - ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock, - DatanodeInfo[] newNodes, String[] newStorageIDs) - throws IOException { - dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, - newNodes, newStorageIDs); + dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + nodes, storageIDs); return newBlock; } - int getNumBlockWriteRetry() { + private int getNumBlockWriteRetry() { return dfsClient.getConf().getNumBlockWriteRetry(); } @@ -1438,7 +1445,7 @@ class DataStreamer extends Daemon { * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ - private LocatedBlock nextBlockOutputStream() throws IOException { + protected LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; @@ -1446,9 +1453,8 @@ class DataStreamer extends Daemon { boolean success = false; ExtendedBlock oldBlock = block; do { - errorState.reset(); + errorState.resetInternalError(); lastException.clear(); - success = false; DatanodeInfo[] excluded = excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) @@ -1488,7 +1494,7 @@ class DataStreamer extends Daemon { // connects to the first datanode in the pipeline // Returns true if success, otherwise return failure. // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, + boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { if (nodes.length == 0) { LOG.info("nodes are empty for write pipeline of " + block); @@ -1567,7 +1573,7 @@ class DataStreamer extends Daemon { assert null == blockStream : "Previous blockStream unclosed"; blockStream = out; result = true; // success - errorState.reset(); + errorState.resetInternalError(); } catch (IOException ie) { if (!errorState.isRestartingNode()) { LOG.info("Exception in createBlockOutputStream " + this, ie); @@ -1603,7 +1609,7 @@ class DataStreamer extends Daemon { if (checkRestart && shouldWaitForRestart(i)) { errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]); } - errorState.setError(true); + errorState.setInternalError(); lastException.set(ie); result = false; // error } finally { @@ -1645,58 +1651,10 @@ class DataStreamer extends Daemon { } } - LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { - final DfsClientConf conf = dfsClient.getConf(); - int retries = conf.getNumBlockWriteLocateFollowingRetry(); - long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); - while (true) { - long localstart = Time.monotonicNow(); - while (true) { - try { - return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes); - } catch (RemoteException e) { - IOException ue = - e.unwrapRemoteException(FileNotFoundException.class, - AccessControlException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class); - if (ue != e) { - throw ue; // no need to retry these exceptions - } - - - if (NotReplicatedYetException.class.getName(). - equals(e.getClassName())) { - if (retries == 0) { - throw e; - } else { - --retries; - LOG.info("Exception while adding a block", e); - long elapsed = Time.monotonicNow() - localstart; - if (elapsed > 5000) { - LOG.info("Waiting for replication for " - + (elapsed / 1000) + " seconds"); - } - try { - LOG.warn("NotReplicatedYetException sleeping " + src - + " retries left " + retries); - Thread.sleep(sleeptime); - sleeptime *= 2; - } catch (InterruptedException ie) { - LOG.warn("Caught exception", ie); - } - } - } else { - throw e; - } - - } - } - } + return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block, + stat.getFileId(), favoredNodes); } /** @@ -1755,6 +1713,10 @@ class DataStreamer extends Daemon { return storageIDs; } + BlockConstructionStage getStage() { + return stage; + } + /** * return the token of the block * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 2f83f7c641..a313ecb189 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -19,18 +19,15 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; -import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.ByteArrayManager; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -46,66 +43,8 @@ import com.google.common.annotations.VisibleForTesting; * other streamers. */ public class StripedDataStreamer extends DataStreamer { - /** - * This class is designed for multiple threads to share a - * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest - * thread calling poll populates entries to the queue and the other threads - * will wait for it. Once the entries are populated, all the threads can poll - * their entries. - * - * @param the queue entry type. - */ - static abstract class ConcurrentPoll { - final MultipleBlockingQueue queue; - - ConcurrentPoll(MultipleBlockingQueue queue) { - this.queue = queue; - } - - T poll(final int i) throws IOException { - for(;;) { - synchronized(queue) { - final T polled = queue.poll(i); - if (polled != null) { // already populated; return polled item. - return polled; - } - if (isReady2Populate()) { - try { - populate(); - return queue.poll(i); - } catch(IOException ioe) { - LOG.warn("Failed to populate, " + this, ioe); - throw ioe; - } - } - } - - // sleep and then retry. - sleep(100, "poll"); - } - } - - boolean isReady2Populate() { - return queue.isEmpty(); - } - - abstract void populate() throws IOException; - } - - private static void sleep(long ms, String op) throws InterruptedIOException { - try { - Thread.sleep(ms); - } catch(InterruptedException ie) { - throw DFSUtil.toInterruptedIOException( - "Sleep interrupted during " + op, ie); - } - } - private final Coordinator coordinator; private final int index; - private volatile boolean failed; - private final ECSchema schema; - private final int cellSize; StripedDataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, @@ -117,102 +56,59 @@ public class StripedDataStreamer extends DataStreamer { byteArrayManage, favoredNodes); this.index = index; this.coordinator = coordinator; - this.schema = stat.getErasureCodingPolicy().getSchema(); - this.cellSize = stat.getErasureCodingPolicy().getCellSize(); } int getIndex() { return index; } - void setFailed(boolean failed) { - this.failed = failed; - } - - boolean isFailed() { - return failed; - } - - private boolean isParityStreamer() { - return index >= schema.getNumDataUnits(); + boolean isHealthy() { + return !streamerClosed() && !getErrorState().hasInternalError(); } @Override protected void endBlock() { - if (!isParityStreamer()) { - coordinator.offerEndBlock(index, block); - } + coordinator.offerEndBlock(index, block); super.endBlock(); } - @Override - int getNumBlockWriteRetry() { - return 0; + /** + * The upper level DFSStripedOutputStream will allocate the new block group. + * All the striped data streamer only needs to fetch from the queue, which + * should be already be ready. + */ + private LocatedBlock getFollowingBlock() throws IOException { + if (!this.isHealthy()) { + // No internal block for this streamer, maybe no enough healthy DN. + // Throw the exception which has been set by the StripedOutputStream. + this.getLastException().check(false); + } + return coordinator.getFollowingBlocks().poll(index); } @Override - LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) - throws IOException { - return new ConcurrentPoll(coordinator.getFollowingBlocks()) { - @Override - boolean isReady2Populate() { - return super.isReady2Populate() - && (block == null || coordinator.hasAllEndBlocks()); - } + protected LocatedBlock nextBlockOutputStream() throws IOException { + boolean success; + LocatedBlock lb = getFollowingBlock(); + block = lb.getBlock(); + block.setNumBytes(0); + bytesSent = 0; + accessToken = lb.getBlockToken(); - @Override - void populate() throws IOException { - getLastException().check(false); + DatanodeInfo[] nodes = lb.getLocations(); + StorageType[] storageTypes = lb.getStorageTypes(); - if (block != null) { - // set numByte for the previous block group - long bytes = 0; - for (int i = 0; i < schema.getNumDataUnits(); i++) { - final ExtendedBlock b = coordinator.takeEndBlock(i); - StripedBlockUtil.checkBlocks(index, block, i, b); - bytes += b.getNumBytes(); - } - block.setNumBytes(bytes); - block.setBlockId(block.getBlockId() - index); - } + // Connect to the DataNode. If fail the internal error state will be set. + success = createBlockOutputStream(nodes, storageTypes, 0L, false); - if (LOG.isDebugEnabled()) { - LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block); - } - - final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( - excludedNodes); - if (lb.getLocations().length < schema.getNumDataUnits()) { - throw new IOException( - "Failed to get datablocks number of nodes from namenode: blockGroupSize= " - + (schema.getNumDataUnits() + schema.getNumParityUnits()) - + ", blocks.length= " + lb.getLocations().length); - } - final LocatedBlock[] blocks = - StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) lb, - cellSize, schema.getNumDataUnits(), schema.getNumParityUnits()); - - for (int i = 0; i < blocks.length; i++) { - StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - if (si.isFailed()) { - continue; // skipping failed data streamer - } - if (blocks[i] == null) { - // Set exception and close streamer as there is no block locations - // found for the parity block. - LOG.warn("Failed to get block location for parity block, index=" - + i); - si.getLastException().set( - new IOException("Failed to get following block, i=" + i)); - si.setFailed(true); - si.endBlock(); - si.close(true); - } else { - queue.offer(i, blocks[i]); - } - } - } - }.poll(index); + if (!success) { + block = null; + final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; + LOG.info("Excluding datanode " + badNode); + excludedNodes.put(badNode, badNode); + throw new IOException("Unable to create new block."); + } + return lb; } @VisibleForTesting @@ -221,119 +117,71 @@ public class StripedDataStreamer extends DataStreamer { } @Override - LocatedBlock updateBlockForPipeline() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("updateBlockForPipeline(), " + this); - } - return new ConcurrentPoll(coordinator.getNewBlocks()) { - @Override - void populate() throws IOException { - final ExtendedBlock bg = coordinator.getBlockGroup(); - final LocatedBlock updated = callUpdateBlockForPipeline(bg); - final long newGS = updated.getBlock().getGenerationStamp(); - final LocatedBlock[] updatedBlks = StripedBlockUtil - .parseStripedBlockGroup((LocatedStripedBlock) updated, cellSize, - schema.getNumDataUnits(), schema.getNumParityUnits()); - for (int i = 0; i < schema.getNumDataUnits() - + schema.getNumParityUnits(); i++) { - StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - if (si.isFailed()) { - continue; // skipping failed data streamer - } - final ExtendedBlock bi = si.getBlock(); - if (bi != null) { - final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), - null, null, null, -1, updated.isCorrupt(), null); - lb.setBlockToken(updatedBlks[i].getBlockToken()); - queue.offer(i, lb); - } else { - final MultipleBlockingQueue followingBlocks - = coordinator.getFollowingBlocks(); - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - lb.getBlock().setGenerationStamp(newGS); - si.getErrorState().reset(); - continue; - } - } + protected void setupPipelineInternal(DatanodeInfo[] nodes, + StorageType[] nodeStorageTypes) throws IOException { + boolean success = false; + while (!success && !streamerClosed() && dfsClient.clientRunning) { + if (!handleRestartingDatanode()) { + return; + } + if (!handleBadDatanode()) { + // for striped streamer if it is datanode error then close the stream + // and return. no need to replace datanode + return; + } - //streamer i just have polled the block, sleep and retry. - sleep(100, "updateBlockForPipeline, " + this); - i--; - } + // get a new generation stamp and an access token + final LocatedBlock lb = coordinator.getNewBlocks().take(index); + long newGS = lb.getBlock().getGenerationStamp(); + setAccessToken(lb.getBlockToken()); + + // set up the pipeline again with the remaining nodes. when a striped + // data streamer comes here, it must be in external error state. + assert getErrorState().hasExternalError(); + success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true); + + failPacket4Testing(); + getErrorState().checkRestartingNodeDeadline(nodes); + + // notify coordinator the result of createBlockOutputStream + synchronized (coordinator) { + if (!streamerClosed()) { + coordinator.updateStreamer(this, success); + coordinator.notify(); + } else { + success = false; } } - }.poll(index); + + if (success) { + // wait for results of other streamers + success = coordinator.takeStreamerUpdateResult(index); + if (success) { + // if all succeeded, update its block using the new GS + block = newBlock(block, newGS); + } else { + // otherwise close the block stream and restart the recovery process + closeStream(); + } + } else { + // if fail, close the stream. The internal error state and last + // exception have already been set in createBlockOutputStream + // TODO: wait for restarting DataNodes during RollingUpgrade + closeStream(); + setStreamerAsClosed(); + } + } // while } - @Override - ExtendedBlock updatePipeline(final long newGS) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("updatePipeline(newGS=" + newGS + "), " + this); + void setExternalError() { + getErrorState().setExternalError(); + synchronized (dataQueue) { + dataQueue.notifyAll(); } - return new ConcurrentPoll(coordinator.getUpdateBlocks()) { - @Override - void populate() throws IOException { - final MultipleBlockingQueue followingBlocks - = coordinator.getFollowingBlocks(); - final ExtendedBlock bg = coordinator.getBlockGroup(); - final ExtendedBlock newBG = newBlock(bg, newGS); - - final int n = schema.getNumDataUnits() + schema.getNumParityUnits(); - final DatanodeInfo[] newNodes = new DatanodeInfo[n]; - final String[] newStorageIDs = new String[n]; - for (int i = 0; i < n; i++) { - final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - DatanodeInfo[] nodes = si.getNodes(); - String[] storageIDs = si.getStorageIDs(); - if (nodes == null || storageIDs == null) { - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - nodes = lb.getLocations(); - storageIDs = lb.getStorageIDs(); - } - } - } - if (nodes != null && storageIDs != null) { - newNodes[i] = nodes[0]; - newStorageIDs[i] = storageIDs[0]; - } else { - //streamer i just have polled the block, sleep and retry. - sleep(100, "updatePipeline, " + this); - i--; - } - } - final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes, - newStorageIDs); - - for (int i = 0; i < n; i++) { - final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - final ExtendedBlock bi = si.getBlock(); - if (bi != null) { - queue.offer(i, newBlock(bi, updated.getGenerationStamp())); - } else if (!si.isFailed()) { - synchronized(followingBlocks) { - final LocatedBlock lb = followingBlocks.peek(i); - if (lb != null) { - lb.getBlock().setGenerationStamp(newGS); - si.getErrorState().reset(); - continue; - } - } - - //streamer i just have polled the block, sleep and retry. - sleep(100, "updatePipeline, " + this); - i--; - } - } - } - }.poll(index); } @Override public String toString() { - return "#" + index + ": " + (failed? "failed, ": "") + super.toString(); + return "#" + index + ": " + (!isHealthy() ? "failed, ": "") + super.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java index 0e927797b3..1d4cff3340 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@ -68,16 +68,28 @@ public class BlockUnderConstructionFeature { /** Set expected locations */ public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets, boolean isStriped) { - int numLocations = targets == null ? 0 : targets.length; + if (targets == null) { + return; + } + int numLocations = 0; + for (DatanodeStorageInfo target : targets) { + if (target != null) { + numLocations++; + } + } + this.replicas = new ReplicaUnderConstruction[numLocations]; - for(int i = 0; i < numLocations; i++) { - // when creating a new striped block we simply sequentially assign block - // index to each storage - Block replicaBlock = isStriped ? - new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) : - block; - replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i], - ReplicaState.RBW); + int offset = 0; + for(int i = 0; i < targets.length; i++) { + if (targets[i] != null) { + // when creating a new striped block we simply sequentially assign block + // index to each storage + Block replicaBlock = isStriped ? + new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) : + block; + replicas[offset++] = new ReplicaUnderConstruction(replicaBlock, + targets[i], ReplicaState.RBW); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index b5b3b9727d..61c6386f14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -513,6 +513,10 @@ public class DatanodeManager { } final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length]; for(int i = 0; i < datanodeID.length; i++) { + if (datanodeID[i].equals(DatanodeID.EMPTY_DATANODE_ID)) { + storages[i] = null; + continue; + } final DatanodeDescriptor dd = getDatanode(datanodeID[i]); storages[i] = dd.getStorageInfo(storageIDs[i]); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 5af35853d2..d49d39bf19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -925,22 +925,21 @@ public class StripedBlockUtil { /** * Check if the information such as IDs and generation stamps in block-i - * match block-j, where block-i and block-j are in the same group. + * match the block group. */ - public static void checkBlocks(int j, ExtendedBlock blockj, + public static void checkBlocks(ExtendedBlock blockGroup, int i, ExtendedBlock blocki) throws IOException { - - if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) { - throw new IOException("Block pool IDs mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) { + throw new IOException("Block pool IDs mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } - if (blocki.getBlockId() - i != blockj.getBlockId() - j) { - throw new IOException("Block IDs mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (blocki.getBlockId() - i != blockGroup.getBlockId()) { + throw new IOException("Block IDs mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } - if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) { - throw new IOException("Generation stamps mismatched: block" + j + "=" - + blockj + ", block" + i + "=" + blocki); + if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) { + throw new IOException("Generation stamps mismatched: block" + i + "=" + + blocki + ", expected block group=" + blockGroup); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 274d3195fb..e621f26d0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1988,35 +1988,14 @@ public class DFSTestUtil { */ public static ExtendedBlock flushInternal(DFSStripedOutputStream out) throws IOException { - out.flushInternal(); + out.flushAllInternals(); return out.getBlock(); } - /** - * Verify that blocks in striped block group are on different nodes, and every - * internal blocks exists. - */ - public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, - int groupSize) { - for (LocatedBlock lb : lbs.getLocatedBlocks()) { - assert lb instanceof LocatedStripedBlock; - HashSet locs = new HashSet<>(); - for (DatanodeInfo datanodeInfo : lb.getLocations()) { - locs.add(datanodeInfo); - } - assertEquals(groupSize, lb.getLocations().length); - assertEquals(groupSize, locs.size()); - - // verify that every internal blocks exists - int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); - assertEquals(groupSize, blockIndices.length); - HashSet found = new HashSet<>(); - for (int index : blockIndices) { - assert index >=0; - found.add(index); - } - assertEquals(groupSize, found.size()); - } + public static ExtendedBlock flushBuffer(DFSStripedOutputStream out) + throws IOException { + out.flush(); + return out.getBlock(); } public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 8d4a0cf5c9..12453fafb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -20,23 +20,35 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.junit.Assert; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Random; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; + public class StripedFileTestUtil { public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class); /* @@ -50,8 +62,8 @@ public class StripedFileTestUtil { static final int stripesPerBlock = 4; static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; + static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; - static final Random random = new Random(); static byte[] generateBytes(int cnt) { byte[] bytes = new byte[cnt]; @@ -61,6 +73,11 @@ public class StripedFileTestUtil { return bytes; } + static byte getByte(long pos) { + final int mod = 29; + return (byte) (pos % mod + 1); + } + static int readAll(FSDataInputStream in, byte[] buf) throws IOException { int readLen = 0; int ret; @@ -71,15 +88,10 @@ public class StripedFileTestUtil { return readLen; } - static byte getByte(long pos) { - final int mod = 29; - return (byte) (pos % mod + 1); - } - static void verifyLength(FileSystem fs, Path srcPath, int fileLength) throws IOException { FileStatus status = fs.getFileStatus(srcPath); - Assert.assertEquals("File length should be the same", fileLength, status.getLen()); + assertEquals("File length should be the same", fileLength, status.getLen()); } static void verifyPread(FileSystem fs, Path srcPath, int fileLength, @@ -101,9 +113,7 @@ public class StripedFileTestUtil { offset += target; } for (int i = 0; i < fileLength - startOffset; i++) { - Assert.assertEquals("Byte at " + (startOffset + i) + " is different, " - + "the startOffset is " + startOffset, - expected[startOffset + i], result[i]); + assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]); } } } @@ -119,8 +129,7 @@ public class StripedFileTestUtil { System.arraycopy(buf, 0, result, readLen, ret); readLen += ret; } - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); + assertEquals("The length of file should be the same to write size", fileLength, readLen); Assert.assertArrayEquals(expected, result); } } @@ -137,8 +146,7 @@ public class StripedFileTestUtil { result.put(buf); buf.clear(); } - Assert.assertEquals("The length of file should be the same to write size", - fileLength, readLen); + assertEquals("The length of file should be the same to write size", fileLength, readLen); Assert.assertArrayEquals(expected, result.array()); } } @@ -199,10 +207,9 @@ public class StripedFileTestUtil { fsdis.seek(pos); byte[] buf = new byte[writeBytes]; int readLen = StripedFileTestUtil.readAll(fsdis, buf); - Assert.assertEquals(readLen, writeBytes - pos); + assertEquals(readLen, writeBytes - pos); for (int i = 0; i < readLen; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", - StripedFileTestUtil.getByte(pos + i), buf[i]); + assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos + i), buf[i]); } } @@ -210,6 +217,7 @@ public class StripedFileTestUtil { final int dnIndex, final AtomicInteger pos) { final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); final DatanodeInfo datanode = getDatanodes(s); + assert datanode != null; LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); cluster.stopDataNode(datanode.getXferAddr()); } @@ -218,7 +226,7 @@ public class StripedFileTestUtil { for(;;) { final DatanodeInfo[] datanodes = streamer.getNodes(); if (datanodes != null) { - Assert.assertEquals(1, datanodes.length); + assertEquals(1, datanodes.length); Assert.assertNotNull(datanodes[0]); return datanodes[0]; } @@ -287,7 +295,6 @@ public class StripedFileTestUtil { * @param min minimum of the range * @param max maximum of the range * @param n number to be generated - * @return */ public static int[] randomArray(int min, int max, int n){ if (n > (max - min + 1) || max < min || min < 0 || max < 0) { @@ -315,4 +322,170 @@ public class StripedFileTestUtil { } return result; } + + /** + * Verify that blocks in striped block group are on different nodes, and every + * internal blocks exists. + */ + public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) { + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + HashSet locs = new HashSet<>(); + Collections.addAll(locs, lb.getLocations()); + assertEquals(groupSize, lb.getLocations().length); + assertEquals(groupSize, locs.size()); + + // verify that every internal blocks exists + int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); + assertEquals(groupSize, blockIndices.length); + HashSet found = new HashSet<>(); + for (int index : blockIndices) { + assert index >=0; + found.add(index); + } + assertEquals(groupSize, found.size()); + } + } + + static void checkData(DistributedFileSystem dfs, Path srcPath, int length, + int[] killedDnIndex, long oldGS) throws IOException { + + StripedFileTestUtil.verifyLength(dfs, srcPath, length); + Arrays.sort(killedDnIndex); + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L, + Long.MAX_VALUE); + int expectedNumGroup = 0; + if (length > 0) { + expectedNumGroup = (length - 1) / BLOCK_GROUP_SIZE + 1; + } + assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size()); + + for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { + Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); + + final long gs = firstBlock.getBlock().getGenerationStamp(); + final String s = "gs=" + gs + ", oldGS=" + oldGS; + LOG.info(s); + Assert.assertTrue(s, gs >= oldGS); + + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) firstBlock, BLOCK_STRIPED_CELL_SIZE, + NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + blockGroupList.add(Arrays.asList(blocks)); + } + + // test each block group + for (int group = 0; group < blockGroupList.size(); group++) { + final boolean isLastGroup = group == blockGroupList.size() - 1; + final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE + : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE; + final int numCellInGroup = (groupSize - 1)/BLOCK_STRIPED_CELL_SIZE + 1; + final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS; + final int lastCellSize = groupSize - (numCellInGroup - 1)*BLOCK_STRIPED_CELL_SIZE; + + //get the data of this block + List blockList = blockGroupList.get(group); + byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][]; + byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][]; + + // for each block, use BlockReader to read data + for (int i = 0; i < blockList.size(); i++) { + final int j = i >= NUM_DATA_BLOCKS? 0: i; + final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS + + (j <= lastCellIndex? 1: 0); + final int blockSize = numCellInBlock*BLOCK_STRIPED_CELL_SIZE + + (isLastGroup && j == lastCellIndex? lastCellSize - BLOCK_STRIPED_CELL_SIZE: 0); + + final byte[] blockBytes = new byte[blockSize]; + if (i < NUM_DATA_BLOCKS) { + dataBlockBytes[i] = blockBytes; + } else { + parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes; + } + + final LocatedBlock lb = blockList.get(i); + LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock + + ", blockSize=" + blockSize + ", lb=" + lb); + if (lb == null) { + continue; + } + final ExtendedBlock block = lb.getBlock(); + assertEquals(blockSize, block.getNumBytes()); + + if (block.getNumBytes() == 0) { + continue; + } + + if (Arrays.binarySearch(killedDnIndex, i) < 0) { + final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( + dfs, lb, 0, block.getNumBytes()); + blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); + blockReader.close(); + } + } + + // check data + final int groupPosInFile = group*BLOCK_GROUP_SIZE; + for (int i = 0; i < dataBlockBytes.length; i++) { + boolean killed = false; + if (Arrays.binarySearch(killedDnIndex, i) >= 0){ + killed = true; + } + final byte[] actual = dataBlockBytes[i]; + for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) { + final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG( + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile; + Assert.assertTrue(posInFile < length); + final byte expected = getByte(posInFile); + + if (killed) { + actual[posInBlk] = expected; + } else { + if(expected != actual[posInBlk]){ + String s = "expected=" + expected + " but actual=" + actual[posInBlk] + + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk + + ". group=" + group + ", i=" + i; + Assert.fail(s); + } + } + } + } + + // check parity + verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group) + .getBlockSize(), + BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex); + } + } + + static void verifyParityBlocks(Configuration conf, final long size, final int cellSize, + byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) { + Arrays.sort(killedDnIndex); + // verify the parity blocks + int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( + size, cellSize, dataBytes.length, dataBytes.length); + final byte[][] expectedParityBytes = new byte[parityBytes.length][]; + for (int i = 0; i < parityBytes.length; i++) { + expectedParityBytes[i] = new byte[parityBlkSize]; + } + for (int i = 0; i < dataBytes.length; i++) { + if (dataBytes[i] == null) { + dataBytes[i] = new byte[dataBytes[0].length]; + } else if (dataBytes[i].length < dataBytes[0].length) { + final byte[] tmp = dataBytes[i]; + dataBytes[i] = new byte[dataBytes[0].length]; + System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length); + } + } + final RawErasureEncoder encoder = + CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length); + encoder.encode(dataBytes, expectedParityBytes); + for (int i = 0; i < parityBytes.length; i++) { + if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){ + Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex), + expectedParityBytes[i], parityBytes[i]); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 0641e8ea65..d78e88b197 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -18,26 +18,14 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.io.erasurecode.CodecUtil; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -154,141 +142,15 @@ public class TestDFSStripedOutputStream { + cellSize + 123); } - private byte[] generateBytes(int cnt) { - byte[] bytes = new byte[cnt]; - for (int i = 0; i < cnt; i++) { - bytes[i] = getByte(i); - } - return bytes; - } - - private byte getByte(long pos) { - int mod = 29; - return (byte) (pos % mod + 1); - } - private void testOneFile(String src, int writeBytes) throws Exception { src += "_" + writeBytes; Path testPath = new Path(src); - byte[] bytes = generateBytes(writeBytes); + byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes); DFSTestUtil.writeFile(fs, testPath, new String(bytes)); StripedFileTestUtil.waitBlockGroupsReported(fs, src); - // check file length - FileStatus status = fs.getFileStatus(testPath); - Assert.assertEquals(writeBytes, status.getLen()); - - checkData(src, writeBytes); - } - - void checkData(String src, int writeBytes) throws IOException { - List> blockGroupList = new ArrayList<>(); - LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); - - for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { - Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); - LocatedBlock[] blocks = StripedBlockUtil. - parseStripedBlockGroup((LocatedStripedBlock) firstBlock, - cellSize, dataBlocks, parityBlocks); - List oneGroup = Arrays.asList(blocks); - blockGroupList.add(oneGroup); - } - - // test each block group - for (int group = 0; group < blockGroupList.size(); group++) { - //get the data of this block - List blockList = blockGroupList.get(group); - byte[][] dataBlockBytes = new byte[dataBlocks][]; - byte[][] parityBlockBytes = new byte[parityBlocks][]; - - // for each block, use BlockReader to read data - for (int i = 0; i < blockList.size(); i++) { - LocatedBlock lblock = blockList.get(i); - if (lblock == null) { - continue; - } - ExtendedBlock block = lblock.getBlock(); - byte[] blockBytes = new byte[(int)block.getNumBytes()]; - if (i < dataBlocks) { - dataBlockBytes[i] = blockBytes; - } else { - parityBlockBytes[i - dataBlocks] = blockBytes; - } - - if (block.getNumBytes() == 0) { - continue; - } - - final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( - fs, lblock, 0, block.getNumBytes()); - blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); - blockReader.close(); - } - - // check if we write the data correctly - for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; - blkIdxInGroup++) { - final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; - if (actualBlkBytes == null) { - continue; - } - for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) { - // calculate the position of this byte in the file - long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize, - dataBlocks, posInBlk, blkIdxInGroup) + - group * blockSize * dataBlocks; - Assert.assertTrue(posInFile < writeBytes); - final byte expected = getByte(posInFile); - - String s = "Unexpected byte " + actualBlkBytes[posInBlk] - + ", expect " + expected - + ". Block group index is " + group - + ", stripe index is " + posInBlk / cellSize - + ", cell index is " + blkIdxInGroup - + ", byte index is " + posInBlk % cellSize; - Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]); - } - } - - verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(), - cellSize, dataBlockBytes, parityBlockBytes); - } - } - - void verifyParity(final long size, final int cellSize, - byte[][] dataBytes, byte[][] parityBytes) { - verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1); - } - - static void verifyParity(Configuration conf, final long size, - final int cellSize, byte[][] dataBytes, - byte[][] parityBytes, int killedDnIndex) { - // verify the parity blocks - int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( - size, cellSize, dataBytes.length, dataBytes.length); - final byte[][] expectedParityBytes = new byte[parityBytes.length][]; - for (int i = 0; i < parityBytes.length; i++) { - expectedParityBytes[i] = new byte[parityBlkSize]; - } - for (int i = 0; i < dataBytes.length; i++) { - if (dataBytes[i] == null) { - dataBytes[i] = new byte[dataBytes[0].length]; - } else if (dataBytes[i].length < dataBytes[0].length) { - final byte[] tmp = dataBytes[i]; - dataBytes[i] = new byte[dataBytes[0].length]; - System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length); - } - } - final RawErasureEncoder encoder = - CodecUtil.createRSRawEncoder(conf, - dataBytes.length, parityBytes.length); - encoder.encode(dataBytes, expectedParityBytes); - for (int i = 0; i < parityBytes.length; i++) { - if (i != killedDnIndex) { - Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex, - expectedParityBytes[i], parityBytes[i]); - } - } + StripedFileTestUtil.checkData(fs, testPath, writeBytes, + new int[]{}, 0); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 44a29e671e..f6c25661df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -30,23 +31,18 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; @@ -74,6 +70,7 @@ public class TestDFSStripedOutputStreamWithFailure { private static final int FLUSH_POS = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; + static { System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS); System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS); @@ -101,6 +98,32 @@ public class TestDFSStripedOutputStreamWithFailure { return lengths; } + private static final int[][] dnIndexSuite = { + {0, 1}, + {0, 5}, + {0, 6}, + {0, 8}, + {1, 5}, + {1, 6}, + {6, 8}, + {0, 1, 2}, + {3, 4, 5}, + {0, 1, 6}, + {0, 5, 6}, + {0, 5, 8}, + {0, 6, 7}, + {5, 6, 7}, + {6, 7, 8}, + }; + + private int[] getKillPositions(int fileLen, int num) { + int[] positions = new int[num]; + for (int i = 0; i < num; i++) { + positions[i] = fileLen * (i + 1) / (num + 1); + } + return positions; + } + private static final List LENGTHS = newLengths(); static int getLength(int i) { @@ -127,41 +150,25 @@ public class TestDFSStripedOutputStreamWithFailure { } } - private static byte getByte(long pos) { - return (byte)pos; - } - private HdfsConfiguration newHdfsConfiguration() { final HdfsConfiguration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); + conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); return conf; } - void runTest(final int length) { - final HdfsConfiguration conf = newHdfsConfiguration(); - for (int dn = 0; dn < 9; dn++) { - try { - setup(conf); - runTest(length, dn, false, conf); - } catch (Exception e) { - final String err = "failed, dn=" + dn + ", length=" + length - + StringUtils.stringifyException(e); - LOG.error(err); - Assert.fail(err); - } finally { - tearDown(); - } - } - } - @Test(timeout=240000) public void testDatanodeFailure56() throws Exception { runTest(getLength(56)); } + @Test(timeout=240000) + public void testMultipleDatanodeFailure56() throws Exception { + runTestWithMultipleFailure(getLength(56)); + } + @Test(timeout=240000) public void testBlockTokenExpired() throws Exception { final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); @@ -174,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure { for (int dn = 0; dn < 9; dn += 2) { try { setup(conf); - runTest(length, dn, true, conf); + runTest(length, new int[]{length/2}, new int[]{dn}, true); } catch (Exception e) { LOG.error("failed, dn=" + dn + ", length=" + length); throw e; @@ -214,22 +221,8 @@ public class TestDFSStripedOutputStreamWithFailure { Assert.fail("Failed to validate available dns against blkGroupSize"); } catch (IOException ioe) { // expected - GenericTestUtils.assertExceptionContains("Failed: the number of " - + "remaining blocks = 5 < the number of data blocks = 6", ioe); - DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out - .getWrappedStream(); - - // get leading streamer and verify the last exception - StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0); - try { - datastreamer.getLastException().check(true); - Assert.fail("Failed to validate available dns against blkGroupSize"); - } catch (IOException le) { - GenericTestUtils.assertExceptionContains( - "Failed to get datablocks number of nodes from" - + " namenode: blockGroupSize= 9, blocks.length= " - + numDatanodes, le); - } + GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" + + " namenode: blockGroupSize= 9, blocks.length= 5", ioe); } } finally { tearDown(); @@ -258,42 +251,73 @@ public class TestDFSStripedOutputStreamWithFailure { int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000; final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); DFSTestUtil.writeFile(dfs, srcPath, new String(expected)); + LOG.info("writing finished. Seek and read the file to verify."); StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength); } finally { tearDown(); } } - private void runTest(final int length, final int dnIndex, - final boolean tokenExpire, final HdfsConfiguration conf) { - try { - runTest(length, length/2, dnIndex, tokenExpire, conf); - } catch(Exception e) { - LOG.info("FAILED", e); - Assert.fail(StringUtils.stringifyException(e)); + void runTest(final int length) { + final HdfsConfiguration conf = newHdfsConfiguration(); + for (int dn = 0; dn < 9; dn++) { + try { + setup(conf); + runTest(length, new int[]{length/2}, new int[]{dn}, false); + } catch (Throwable e) { + final String err = "failed, dn=" + dn + ", length=" + length + + StringUtils.stringifyException(e); + LOG.error(err); + Assert.fail(err); + } finally { + tearDown(); + } } } - private void runTest(final int length, final int killPos, - final int dnIndex, final boolean tokenExpire, - final HdfsConfiguration conf) throws Exception { - if (killPos <= FLUSH_POS) { - LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS - + ", length=" + length + ", dnIndex=" + dnIndex); + void runTestWithMultipleFailure(final int length) throws Exception { + final HdfsConfiguration conf = newHdfsConfiguration(); + for(int i=0;i killPos[0], "length=%s <= killPos=%s", + length, killPos); + Preconditions.checkArgument(killPos.length == dnIndex.length); - // start a datanode now, will kill one later - cluster.startDataNodes(conf, 1, true, null, null); - cluster.waitActive(); - - final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" + killPos); + final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex) + + "len" + length + "kill" + Arrays.toString(killPos)); final String fullPath = p.toString(); LOG.info("fullPath=" + fullPath); - if (tokenExpire) { final NameNode nn = cluster.getNameNode(); final BlockManager bm = nn.getNamesystem().getBlockManager(); @@ -308,50 +332,56 @@ public class TestDFSStripedOutputStreamWithFailure { final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream)out.getWrappedStream(); - long oldGS = -1; - boolean killed = false; + long firstGS = -1; // first GS of this block group which never proceeds blockRecovery + long oldGS = -1; // the old GS before bumping + int numKilled=0; for(; pos.get() < length; ) { final int i = pos.getAndIncrement(); - if (i == killPos) { + if (numKilled < killPos.length && i == killPos[numKilled]) { + assertTrue(firstGS != -1); final long gs = getGenerationStamp(stripedOut); - Assert.assertTrue(oldGS != -1); - Assert.assertEquals(oldGS, gs); + if (numKilled == 0) { + assertEquals(firstGS, gs); + } else { + //TODO: implement hflush/hsync and verify gs strict greater than oldGS + assertTrue(gs >= oldGS); + } + oldGS = gs; if (tokenExpire) { DFSTestUtil.flushInternal(stripedOut); waitTokenExpires(out); } - killDatanode(cluster, stripedOut, dnIndex, pos); - killed = true; + killDatanode(cluster, stripedOut, dnIndex[numKilled], pos); + numKilled++; } write(out, i); - if (i == FLUSH_POS) { - oldGS = getGenerationStamp(stripedOut); + if (i % BLOCK_GROUP_SIZE == FLUSH_POS) { + firstGS = getGenerationStamp(stripedOut); + oldGS = firstGS; } } out.close(); + assertEquals(dnIndex.length, numKilled); short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length); - if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) { - expectedReported--; + for(int idx :dnIndex) { + if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) { + expectedReported--; + } } DFSTestUtil.waitReplication(dfs, p, expectedReported); - Assert.assertTrue(killed); - - // check file length - final FileStatus status = dfs.getFileStatus(p); - Assert.assertEquals(length, status.getLen()); - - checkData(dfs, fullPath, length, dnIndex, oldGS); + cluster.triggerBlockReports(); + StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS); } static void write(FSDataOutputStream out, int i) throws IOException { try { - out.write(getByte(i)); + out.write(StripedFileTestUtil.getByte(i)); } catch(IOException ioe) { throw new IOException("Failed at i=" + i, ioe); } @@ -359,10 +389,10 @@ public class TestDFSStripedOutputStreamWithFailure { static long getGenerationStamp(DFSStripedOutputStream out) throws IOException { + DFSTestUtil.flushBuffer(out); final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp(); LOG.info("getGenerationStamp returns " + gs); return gs; - } static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { @@ -399,106 +429,6 @@ public class TestDFSStripedOutputStreamWithFailure { cluster.stopDataNode(datanode.getXferAddr()); } - static void checkData(DistributedFileSystem dfs, String src, int length, - int killedDnIndex, long oldGS) throws IOException { - List> blockGroupList = new ArrayList<>(); - LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L); - final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1; - Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size()); - - for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { - Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); - - final long gs = firstBlock.getBlock().getGenerationStamp(); - final String s = "gs=" + gs + ", oldGS=" + oldGS; - LOG.info(s); - Assert.assertTrue(s, gs >= oldGS); - - LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( - (LocatedStripedBlock) firstBlock, - CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); - blockGroupList.add(Arrays.asList(blocks)); - } - - // test each block group - for (int group = 0; group < blockGroupList.size(); group++) { - final boolean isLastGroup = group == blockGroupList.size() - 1; - final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE - : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE; - final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1; - final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS; - final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE; - - //get the data of this block - List blockList = blockGroupList.get(group); - byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][]; - byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][]; - - // for each block, use BlockReader to read data - for (int i = 0; i < blockList.size(); i++) { - final int j = i >= NUM_DATA_BLOCKS? 0: i; - final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS - + (j <= lastCellIndex? 1: 0); - final int blockSize = numCellInBlock*CELL_SIZE - + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0); - - final byte[] blockBytes = new byte[blockSize]; - if (i < NUM_DATA_BLOCKS) { - dataBlockBytes[i] = blockBytes; - } else { - parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes; - } - - final LocatedBlock lb = blockList.get(i); - LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock - + ", blockSize=" + blockSize + ", lb=" + lb); - if (lb == null) { - continue; - } - final ExtendedBlock block = lb.getBlock(); - Assert.assertEquals(blockSize, block.getNumBytes()); - - - if (block.getNumBytes() == 0) { - continue; - } - - if (i != killedDnIndex) { - final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( - dfs, lb, 0, block.getNumBytes()); - blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); - blockReader.close(); - } - } - - // check data - final int groupPosInFile = group*BLOCK_GROUP_SIZE; - for (int i = 0; i < dataBlockBytes.length; i++) { - final byte[] actual = dataBlockBytes[i]; - for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) { - final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG( - CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile; - Assert.assertTrue(posInFile < length); - final byte expected = getByte(posInFile); - - if (i == killedDnIndex) { - actual[posInBlk] = expected; - } else { - String s = "expected=" + expected + " but actual=" + actual[posInBlk] - + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk - + ". group=" + group + ", i=" + i; - Assert.assertEquals(s, expected, actual[posInBlk]); - } - } - } - - // check parity - TestDFSStripedOutputStream.verifyParity(dfs.getConf(), - lbs.getLocatedBlocks().get(group).getBlockSize(), - CELL_SIZE, dataBlockBytes, parityBlockBytes, - killedDnIndex - dataBlockBytes.length); - } - } private void waitTokenExpires(FSDataOutputStream out) throws IOException { Token token = DFSTestUtil.getBlockToken(out); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java index c0dca4e866..764527d907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -39,6 +41,12 @@ public class TestWriteStripedFileWithFailure { private static MiniDFSCluster cluster; private static FileSystem fs; private static Configuration conf = new HdfsConfiguration(); + + static { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private final short parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS; private final int smallFileLength = blockSize * dataBlocks - 123; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 124bf8060e..ef315275f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1745,7 +1745,7 @@ public class TestBalancer { // verify locations of striped blocks LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); // add one datanode String newRack = "/rack" + (++numOfRacks); @@ -1761,7 +1761,7 @@ public class TestBalancer { // verify locations of striped blocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); } finally { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 3a9748f223..7cf56562a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -488,7 +488,7 @@ public class TestMover { Assert.assertEquals(StorageType.DISK, type); } } - DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, dataBlocks + parityBlocks); // start 5 more datanodes @@ -523,7 +523,7 @@ public class TestMover { Assert.assertEquals(StorageType.ARCHIVE, type); } } - DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, dataBlocks + parityBlocks); }finally{ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java index 64d33a406f..abcdbc10c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; @@ -42,7 +41,6 @@ import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class TestAddOverReplicatedStripedBlocks { @@ -64,6 +62,7 @@ public class TestAddOverReplicatedStripedBlocks { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); // disable block recovery conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); SimulatedFSDataset.setFactory(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive(); @@ -118,7 +117,7 @@ public class TestAddOverReplicatedStripedBlocks { // verify that all internal blocks exists lbs = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); } @Test @@ -162,7 +161,7 @@ public class TestAddOverReplicatedStripedBlocks { // verify that all internal blocks exists lbs = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); } @Test @@ -216,7 +215,7 @@ public class TestAddOverReplicatedStripedBlocks { // verify that all internal blocks exists lbs = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); } @Test @@ -248,6 +247,7 @@ public class TestAddOverReplicatedStripedBlocks { // update blocksMap cluster.triggerBlockReports(); + Thread.sleep(2000); // add to invalidates cluster.triggerHeartbeats(); // datanode delete block @@ -259,7 +259,7 @@ public class TestAddOverReplicatedStripedBlocks { // we are left GROUP_SIZE - 1 blocks. lbs = cluster.getNameNodeRpc().getBlockLocations( filePath.toString(), 0, fileLen); - DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index c27ead5f65..735f84dfa1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; @@ -736,7 +737,13 @@ public class TestRetryCacheWithHA { DatanodeInfo[] newNodes = new DatanodeInfo[2]; newNodes[0] = nodes[0]; newNodes[1] = nodes[1]; - String[] storageIDs = {"s0", "s1"}; + final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager() + .getDatanodeManager(); + final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0] + .getStorageID(); + final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0] + .getStorageID(); + String[] storageIDs = {storageID1, storageID2}; client.getNamenode().updatePipeline(client.getClientName(), oldBlock, newBlock, newNodes, storageIDs);