From 5f15084bd530865d3e2641b709665b5b7971a74d Mon Sep 17 00:00:00 2001
From: Jing Zhao <jing9@apache.org>
Date: Tue, 2 Jun 2015 16:14:08 -0700
Subject: [PATCH] HDFS-8453. Erasure coding: properly handle start offset for
 internal blocks in a block group. Contributed by Zhe Zhang.

---
 .../hadoop/hdfs/protocol/HdfsConstants.java   |  2 +-
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt      |  3 ++
 .../apache/hadoop/hdfs/DFSInputStream.java    | 48 +++++++++++--------
 .../hadoop/hdfs/DFSStripedInputStream.java    | 45 ++++++++---------
 .../hadoop/hdfs/util/StripedBlockUtil.java    |  5 +-
 .../hdfs/TestDFSStripedInputStream.java       |  4 +-
 .../hdfs/TestReadStripedFileWithDecoding.java | 24 ++++++----
 7 files changed, 72 insertions(+), 59 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 32ca81c326..a527e233de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -84,7 +84,7 @@ public final class HdfsConstants {
   public static final byte NUM_DATA_BLOCKS = 6;
   public static final byte NUM_PARITY_BLOCKS = 3;
   // The chunk size for striped block which is used by erasure coding
-  public static final int BLOCK_STRIPED_CELL_SIZE = 256 * 1024;
+  public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
 
   // SafeMode actions
   public enum SafeModeAction {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index fa0a8e2c54..278f897e42 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -274,3 +274,6 @@
 
     HDFS-8517. Fix a decoding issue in stripped block recovering in client side.
     (Kai Zheng via jing9)
+
+    HDFS-8453. Erasure coding: properly handle start offset for internal blocks
+    in a block group. (Zhe Zhang via jing9)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 0d51a5752d..6102edfe92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1016,7 +1016,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         }
         deadNodes.clear(); //2nd option is to remove only nodes[blockId]
         openInfo();
-        block = getBlockAt(block.getStartOffset());
+        block = refreshLocatedBlock(block);
         failures++;
       }
     }
@@ -1088,15 +1088,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
-        actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+        actualGetFromOneDataNode(addressPair, block, start, end,
             buf, offset, corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1107,7 +1107,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
-      final long blockStartOffset, final long start, final long end,
+      final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -1120,7 +1120,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         TraceScope scope =
             Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+          actualGetFromOneDataNode(datanode, block, start, end, buf,
               offset, corruptedBlockMap);
           return bb;
         } finally {
@@ -1134,18 +1134,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Used when reading contiguous blocks
    */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      long blockStartOffset, final long start, final long end, byte[] buf,
+      LocatedBlock block, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     final int length = (int) (end - start + 1);
-    actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
         new int[]{offset}, new int[]{length}, corruptedBlockMap);
   }
 
   /**
    * Read data from one DataNode.
    * @param datanode the datanode from which to read data
-   * @param blockStartOffset starting offset in the file
+   * @param block the located block containing the requested data
    * @param startInBlk the startInBlk offset of the block
    * @param endInBlk the endInBlk offset of the block
    * @param buf the given byte array into which the data is read
@@ -1157,7 +1157,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode,
-      long blockStartOffset, final long startInBlk, final long endInBlk,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
       byte[] buf, int[] offsets, int[] lengths,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1171,7 +1171,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
-      LocatedBlock block = getBlockAt(blockStartOffset);
+      block = refreshLocatedBlock(block);
       BlockReader reader = null;
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
@@ -1227,6 +1227,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
+  /**
+   * Refresh cached block locations.
+   * @param block The currently cached block locations
+   * @return Refreshed block locations
+   * @throws IOException
+   */
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    return getBlockAt(block.getStartOffset());
+  }
+
   /**
    * This method verifies that the read portions are valid and do not overlap
    * with each other.
@@ -1250,7 +1261,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * 'hedged' read if the first read is taking longer than configured amount of
    * time. We then wait on which ever read returns first.
    */
-  private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1263,7 +1274,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     int hedgedReadId = 0;
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
       hedgedReadOpsLoopNumForTesting++;
@@ -1275,7 +1286,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-            chosenNode, block.getStartOffset(), start, end, bb,
+            chosenNode, block, start, end, bb,
             corruptedBlockMap, hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
@@ -1312,7 +1323,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           }
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block.getStartOffset(), start, end, bb,
+              chosenNode, block, start, end, bb,
               corruptedBlockMap, hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
@@ -1466,12 +1477,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
         if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
-          hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
+          hedgedFetchBlockByteRange(blk, targetStart,
               targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 228368ba49..2e26cca094 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.ByteBufferPool;
 
@@ -470,22 +471,17 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   /**
-   * | <--------- LocatedStripedBlock (ID = 0) ---------> |
-   * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
-   *                      ^
-   *                    offset
-   * On a striped file, the super method {@link DFSInputStream#getBlockAt}
-   * treats a striped block group as a single {@link LocatedBlock} object,
-   * which includes target in its range. This method adds the logic of:
-   *   1. Analyzing the index of required block based on offset
-   *   2. Parsing the block group to obtain the block location on that index
+   * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
+   * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
+   * This method extends the logic by first remembering the index of the
+   * internal block, and re-parsing the refreshed block group with the same
+   * index.
    */
   @Override
-  protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
-    LocatedBlock lb = getBlockGroupAt(blkStartOffset);
-
-    int idx = (int) ((blkStartOffset - lb.getStartOffset())
-        % (dataBlkNum + parityBlkNum));
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    int idx = BlockIdManager.getBlockIndex(block.getBlock().getLocalBlock());
+    LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
     // If indexing information is returned, iterate through the index array
     // to find the entry for position idx in the group
     LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@@ -496,10 +492,11 @@ public class DFSStripedInputStream extends DFSInputStream {
       }
     }
     if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
-          + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
+      DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
+          + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
     }
-    return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx);
+    return StripedBlockUtil.constructInternalBlock(
+        lsb, i, cellSize, dataBlkNum, idx);
   }
 
   private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
@@ -513,12 +510,12 @@ public class DFSStripedInputStream extends DFSInputStream {
    * Real implementation of pread.
    */
   @Override
-  protected void fetchBlockByteRange(long blockStartOffset, long start,
+  protected void fetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     // Refresh the striped block group
-    LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
+    LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
 
     AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize,
         blockGroup, start, end, buf, offset);
@@ -622,9 +619,9 @@ public class DFSStripedInputStream extends DFSInputStream {
     StripingChunk chunk = alignedStripe.chunks[index];
     chunk.state = StripingChunk.PENDING;
     Callable<Void> readCallable = getFromOneDataNode(dnAddr,
-        block.getStartOffset(), alignedStripe.getOffsetInBlock(),
-        alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf,
-        chunk.getOffsets(), chunk.getLengths(),
+        block, alignedStripe.getOffsetInBlock(),
+        alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1,
+        chunk.buf, chunk.getOffsets(), chunk.getLengths(),
         corruptedBlockMap, index);
     Future<Void> getFromDNRequest = service.submit(readCallable);
     if (DFSClient.LOG.isDebugEnabled()) {
@@ -637,7 +634,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
-      final long blockStartOffset, final long start, final long end,
+      final LocatedBlock block, final long start, final long end,
       final byte[] buf, final int[] offsets, final int[] lengths,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -648,7 +645,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         TraceScope scope =
             Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, blockStartOffset, start,
+          actualGetFromOneDataNode(datanode, block, start,
               end, buf, offsets, lengths, corruptedBlockMap);
         } finally {
           scope.close();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 80321ef959..1db2045741 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -105,16 +105,15 @@ public class StripedBlockUtil {
     final ExtendedBlock blk = constructInternalBlock(
         bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
 
-    final long offset = bg.getStartOffset() + idxInBlockGroup * (long) cellSize;
     if (idxInReturnedLocs < bg.getLocations().length) {
       return new LocatedBlock(blk,
           new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
           new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
           new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
-          offset, bg.isCorrupt(), null);
+          bg.getStartOffset(), bg.isCorrupt(), null);
     } else {
       return new LocatedBlock(blk, null, null, null,
-          offset, bg.isCorrupt(), null);
+          bg.getStartOffset(), bg.isCorrupt(), null);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index b64e690e1e..de43441d67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -87,7 +87,7 @@ public class TestDFSStripedInputStream {
    * Test {@link DFSStripedInputStream#getBlockAt(long)}
    */
   @Test
-  public void testGetBlock() throws Exception {
+  public void testRefreshBlock() throws Exception {
     final int numBlocks = 4;
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
         NUM_STRIPE_PER_BLOCK, false);
@@ -102,7 +102,7 @@ public class TestDFSStripedInputStream {
       LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
           CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
       for (int j = 0; j < DATA_BLK_NUM; j++) {
-        LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+        LocatedBlock refreshed = in.refreshLocatedBlock(blks[j]);
         assertEquals(blks[j].getBlock(), refreshed.getBlock());
         assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
         assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 7397caf470..a28f88ef8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -59,20 +59,24 @@ public class TestReadStripedFileWithDecoding {
 
   @Test
   public void testWritePreadWithDNFailure1() throws IOException {
-    testWritePreadWithDNFailure("/foo", 0);
+    testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
   }
 
   @Test
   public void testWritePreadWithDNFailure2() throws IOException {
-    testWritePreadWithDNFailure("/foo", cellSize * 5);
+    testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
   }
 
-  private void testWritePreadWithDNFailure(String file, int startOffsetInFile)
+  @Test
+  public void testWritePreadWithDNFailure3() throws IOException {
+    testWritePreadWithDNFailure("/foo", cellSize * dataBlocks, 0);
+  }
+
+  private void testWritePreadWithDNFailure(String file, int fileSize, int startOffsetInFile)
       throws IOException {
     final int failedDNIdx = 2;
-    final int length = cellSize * (dataBlocks + 2);
     Path testPath = new Path(file);
-    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
     DFSTestUtil.writeFile(fs, testPath, bytes);
 
     // shut down the DN that holds the last internal data block
@@ -89,17 +93,17 @@ public class TestReadStripedFileWithDecoding {
 
     // pread
     try (FSDataInputStream fsdis = fs.open(testPath)) {
-      byte[] buf = new byte[length];
+      byte[] buf = new byte[fileSize];
       int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
-      Assert.assertEquals("The length of file should be the same to write size",
-          length - startOffsetInFile, readLen);
+      Assert.assertEquals("The fileSize of file should be the same to write size",
+          fileSize - startOffsetInFile, readLen);
 
       byte[] expected = new byte[readLen];
-      for (int i = startOffsetInFile; i < length; i++) {
+      for (int i = startOffsetInFile; i < fileSize; i++) {
         expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i);
       }
 
-      for (int i = startOffsetInFile; i < length; i++) {
+      for (int i = startOffsetInFile; i < fileSize; i++) {
         Assert.assertEquals("Byte at " + i + " should be the same",
             expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
       }