From ccd2ac60ecc5fccce56debf21a068e663c1d5f11 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 5 Sep 2017 14:16:03 -0700 Subject: [PATCH] HDFS-11882. Precisely calculate acked length of striped block groups in updatePipeline. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 160 ++++++++++++++++-- ...TestDFSStripedOutputStreamWithFailure.java | 77 ++++++++- 2 files changed, 222 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index d5206d1c09..408b3256df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -641,7 +641,7 @@ private void checkStreamerFailures() throws IOException { // wait till all the healthy streamers to // 1) get the updated block info // 2) create new block outputstream - newFailed = waitCreatingNewStreams(healthySet); + newFailed = waitCreatingStreamers(healthySet); if (newFailed.size() + failedStreamers.size() > numAllBlocks - numDataBlocks) { throw new IOException( @@ -668,6 +668,14 @@ private void checkStreamerFailures() throws IOException { } } + /** + * Check if the streamers were successfully updated, adding failed streamers + * in the failed return parameter. + * @param failed Return parameter containing failed streamers from + * streamers. + * @param streamers Set of streamers that are being updated + * @return total number of successful updates and failures + */ private int checkStreamerUpdates(Set failed, Set streamers) { for (StripedDataStreamer streamer : streamers) { @@ -682,7 +690,15 @@ private int checkStreamerUpdates(Set failed, return coordinator.updateStreamerMap.size() + failed.size(); } - private Set waitCreatingNewStreams( + /** + * Waits for streamers to be created. + * + * @param healthyStreamers Set of healthy streamers + * @return Set of streamers that failed. + * + * @throws IOException + */ + private Set waitCreatingStreamers( Set healthyStreamers) throws IOException { Set failed = new HashSet<>(); final int expectedNum = healthyStreamers.size(); @@ -773,9 +789,10 @@ private void updatePipeline(ExtendedBlock newBG) throws IOException { } } - // should update the block group length based on the acked length + // Update the NameNode with the acked length of the block group + // Save and restore the unacked length final long sentBytes = currentBlockGroup.getNumBytes(); - final long ackedBytes = getNumAckedStripes() * cellSize * numDataBlocks; + final long ackedBytes = getAckedLength(); Preconditions.checkState(ackedBytes <= sentBytes, "Acked:" + ackedBytes + ", Sent:" + sentBytes); currentBlockGroup.setNumBytes(ackedBytes); @@ -787,23 +804,140 @@ private void updatePipeline(ExtendedBlock newBG) throws IOException { } /** - * Get the number of acked stripes. An acked stripe means at least data block - * number size cells of the stripe were acked. + * Return the length of each block in the block group. + * Unhealthy blocks have a length of -1. + * + * @return List of block lengths. */ - private long getNumAckedStripes() { - int minStripeNum = Integer.MAX_VALUE; + private List getBlockLengths() { + List blockLengths = new ArrayList<>(numAllBlocks); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer streamer = getStripedDataStreamer(i); + long numBytes = -1; if (streamer.isHealthy()) { - int curStripeNum = 0; if (streamer.getBlock() != null) { - curStripeNum = (int) (streamer.getBlock().getNumBytes() / cellSize); + numBytes = streamer.getBlock().getNumBytes(); } - minStripeNum = Math.min(curStripeNum, minStripeNum); + } + blockLengths.add(numBytes); + } + return blockLengths; + } + + /** + * Get the length of acked bytes in the block group. + * + *

+ * A full stripe is acked when at least numDataBlocks streamers have + * the corresponding cells of the stripe, and all previous full stripes are + * also acked. This enforces the constraint that there is at most one + * partial stripe. + *

+ *

+ * Partial stripes write all parity cells. Empty data cells are not written. + * Parity cells are the length of the longest data cell(s). For example, + * with RS(3,2), if we have data cells with lengths [1MB, 64KB, 0], the + * parity blocks will be length [1MB, 1MB]. + *

+ *

+ * To be considered acked, a partial stripe needs at least numDataBlocks + * empty or written cells. + *

+ *

+ * Currently, partial stripes can only happen when closing the file at a + * non-stripe boundary, but this could also happen during (currently + * unimplemented) hflush/hsync support. + *

+ */ + private long getAckedLength() { + // Determine the number of full stripes that are sufficiently durable + final long sentBytes = currentBlockGroup.getNumBytes(); + final long numFullStripes = sentBytes / numDataBlocks / cellSize; + final long fullStripeLength = numFullStripes * numDataBlocks * cellSize; + assert fullStripeLength <= sentBytes : "Full stripe length can't be " + + "greater than the block group length"; + + long ackedLength = 0; + + // Determine the length contained by at least `numDataBlocks` blocks. + // Since it's sorted, all the blocks after `offset` are at least as long, + // and there are at least `numDataBlocks` at or after `offset`. + List blockLengths = Collections.unmodifiableList(getBlockLengths()); + List sortedBlockLengths = new ArrayList<>(blockLengths); + Collections.sort(sortedBlockLengths); + if (numFullStripes > 0) { + final int offset = sortedBlockLengths.size() - numDataBlocks; + ackedLength = sortedBlockLengths.get(offset) * numDataBlocks; + } + + // If the acked length is less than the expected full stripe length, then + // we're missing a full stripe. Return the acked length. + if (ackedLength < fullStripeLength) { + return ackedLength; + } + // If the expected length is exactly a stripe boundary, then we're also done + if (ackedLength == sentBytes) { + return ackedLength; + } + + /* + Otherwise, we're potentially dealing with a partial stripe. + The partial stripe is laid out as follows: + + 0 or more full data cells, `cellSize` in length. + 0 or 1 partial data cells. + 0 or more empty data cells. + `numParityBlocks` parity cells, the length of the longest data cell. + + If the partial stripe is sufficiently acked, we'll update the ackedLength. + */ + + // How many full and empty data cells do we expect? + final int numFullDataCells = (int) + ((sentBytes - fullStripeLength) / cellSize); + final int partialLength = (int) (sentBytes - fullStripeLength) % cellSize; + final int numPartialDataCells = partialLength == 0 ? 0 : 1; + final int numEmptyDataCells = numDataBlocks - numFullDataCells - + numPartialDataCells; + // Calculate the expected length of the parity blocks. + final int parityLength = numFullDataCells > 0 ? cellSize : partialLength; + + final long fullStripeBlockOffset = fullStripeLength / numDataBlocks; + + // Iterate through each type of streamers, checking the expected length. + long[] expectedBlockLengths = new long[numAllBlocks]; + int idx = 0; + // Full cells + for (; idx < numFullDataCells; idx++) { + expectedBlockLengths[idx] = fullStripeBlockOffset + cellSize; + } + // Partial cell + for (; idx < numFullDataCells + numPartialDataCells; idx++) { + expectedBlockLengths[idx] = fullStripeBlockOffset + partialLength; + } + // Empty cells + for (; idx < numFullDataCells + numPartialDataCells + numEmptyDataCells; + idx++) { + expectedBlockLengths[idx] = fullStripeBlockOffset; + } + // Parity cells + for (; idx < numAllBlocks; idx++) { + expectedBlockLengths[idx] = fullStripeBlockOffset + parityLength; + } + + // Check expected lengths against actual streamer lengths. + // Update if we have sufficient durability. + int numBlocksWithCorrectLength = 0; + for (int i = 0; i < numAllBlocks; i++) { + if (blockLengths.get(i) == expectedBlockLengths[i]) { + numBlocksWithCorrectLength++; } } - assert minStripeNum != Integer.MAX_VALUE; - return minStripeNum; + if (numBlocksWithCorrectLength >= numDataBlocks) { + ackedLength = sentBytes; + } + + return ackedLength; } private int stripeDataSize() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 9915a2fb6d..f63a353815 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -45,7 +45,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -61,6 +60,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; /** * Test striped file write operation with data node failures. @@ -390,6 +390,79 @@ void runTestWithMultipleFailure(final int length) throws Exception { } } + /** + * When the two DataNodes with partial data blocks fail. + */ + @Test + public void runTestWithDifferentLengths() throws Exception { + assumeTrue("Skip this test case in the subclasses. Once is enough.", + this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class)); + + final HdfsConfiguration conf = newHdfsConfiguration(); + + final int[] fileLengths = { + // Full stripe then partial on cell boundary + cellSize * (dataBlocks * 2 - 2), + // Full stripe and a partial on non-cell boundary + (cellSize * dataBlocks) + 123, + }; + try { + for (int length: fileLengths) { + // select the two DNs with partial block to kill + final int[] dnIndex = {dataBlocks - 2, dataBlocks - 1}; + final int[] killPos = getKillPositions(length, dnIndex.length); + try { + LOG.info("runTestWithMultipleFailure2: length==" + length + + ", killPos=" + Arrays.toString(killPos) + + ", dnIndex=" + Arrays.toString(dnIndex)); + setup(conf); + runTest(length, killPos, dnIndex, false); + } catch (Throwable e) { + final String err = "failed, killPos=" + Arrays.toString(killPos) + + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length; + LOG.error(err); + throw e; + } + } + } finally { + tearDown(); + } + } + + /** + * Test writing very short EC files with many failures. + */ + @Test + public void runTestWithShortStripe() throws Exception { + assumeTrue("Skip this test case in the subclasses. Once is enough.", + this.getClass().equals(TestDFSStripedOutputStreamWithFailure.class)); + + final HdfsConfiguration conf = newHdfsConfiguration(); + // Write a file with a 1 cell partial stripe + final int length = cellSize - 123; + // Kill all but one DN + final int[] dnIndex = new int[dataBlocks + parityBlocks - 1]; + for (int i = 0; i < dnIndex.length; i++) { + dnIndex[i] = i; + } + final int[] killPos = getKillPositions(length, dnIndex.length); + + try { + LOG.info("runTestWithShortStripe: length==" + length + ", killPos=" + + Arrays.toString(killPos) + ", dnIndex=" + + Arrays.toString(dnIndex)); + setup(conf); + runTest(length, killPos, dnIndex, false); + } catch (Throwable e) { + final String err = "failed, killPos=" + Arrays.toString(killPos) + + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length; + LOG.error(err); + throw e; + } finally { + tearDown(); + } + } + /** * runTest implementation. * @param length file length @@ -558,7 +631,7 @@ int getBase() { private void run(int offset) { int base = getBase(); - Assume.assumeTrue(base >= 0); + assumeTrue(base >= 0); final int i = offset + base; final Integer length = getLength(i); if (length == null) {