diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
index 2c3329e47a..9d6ab9a102 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
@@ -8,6 +8,7 @@
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3506d3ac70..88bd21909d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1704,7 +1704,10 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException {
/**
* Get the checksum of the whole file or a range of the file. Note that the
- * range always starts from the beginning of the file.
+ * range always starts from the beginning of the file. The file can be
+ * in replicated form, or striped mode. It can be used to checksum and compare
+ * two replicated files, or two striped files, but not applicable for two
+ * files of different block layout forms.
* @param src The file path
* @param length the length of the range, i.e., the range is [0, length]
* @return The checksum
@@ -1717,7 +1720,11 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
LocatedBlocks blockLocations = getBlockLocations(src, length);
- FileChecksumHelper.FileChecksumComputer maker =
+ FileChecksumHelper.FileChecksumComputer maker;
+ ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy();
+ maker = ecPolicy != null ?
+ new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
+ length, blockLocations, namenode, this, ecPolicy) :
new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
blockLocations, namenode, this);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index d15db9f11c..dfd939397b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -22,10 +22,13 @@
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@@ -75,7 +78,7 @@ static abstract class FileChecksumComputer {
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
private long crcPerBlock = 0;
- private boolean refetchBlocks = false;
+ private boolean isRefetchBlocks = false;
private int lastRetriedIndex = -1;
/**
@@ -127,8 +130,11 @@ LocatedBlocks getBlockLocations() {
return blockLocations;
}
- void setBlockLocations(LocatedBlocks blockLocations) {
- this.blockLocations = blockLocations;
+ void refetchBlocks() throws IOException {
+ this.blockLocations = getClient().getBlockLocations(getSrc(),
+ getLength());
+ this.locatedBlocks = getBlockLocations().getLocatedBlocks();
+ this.isRefetchBlocks = false;
}
int getTimeout() {
@@ -143,10 +149,6 @@ List getLocatedBlocks() {
return locatedBlocks;
}
- void setLocatedBlocks(List locatedBlocks) {
- this.locatedBlocks = locatedBlocks;
- }
-
long getRemaining() {
return remaining;
}
@@ -180,11 +182,11 @@ void setCrcPerBlock(long crcPerBlock) {
}
boolean isRefetchBlocks() {
- return refetchBlocks;
+ return isRefetchBlocks;
}
void setRefetchBlocks(boolean refetchBlocks) {
- this.refetchBlocks = refetchBlocks;
+ this.isRefetchBlocks = refetchBlocks;
}
int getLastRetriedIndex() {
@@ -278,10 +280,7 @@ void checksumBlocks() throws IOException {
blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
blockIdx++) {
if (isRefetchBlocks()) { // refetch to get fresh tokens
- setBlockLocations(getClient().getBlockLocations(getSrc(),
- getLength()));
- setLocatedBlocks(getBlockLocations().getLocatedBlocks());
- setRefetchBlocks(false);
+ refetchBlocks();
}
LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
@@ -380,15 +379,13 @@ private void tryDatanode(LocatedBlock locatedBlock,
}
//read md5
- final MD5Hash md5 = new MD5Hash(
- checksumData.getMd5().toByteArray());
+ final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
md5.write(getMd5out());
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
- ct = PBHelperClient.convert(checksumData
- .getCrcType());
+ ct = PBHelperClient.convert(checksumData.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
@@ -413,4 +410,160 @@ && getCrcType() != ct) {
}
}
}
+
+ /**
+ * Striped file checksum computing.
+ */
+ static class StripedFileNonStripedChecksumComputer
+ extends FileChecksumComputer {
+ private final ErasureCodingPolicy ecPolicy;
+ private int bgIdx;
+
+ StripedFileNonStripedChecksumComputer(String src, long length,
+ LocatedBlocks blockLocations,
+ ClientProtocol namenode,
+ DFSClient client,
+ ErasureCodingPolicy ecPolicy)
+ throws IOException {
+ super(src, length, blockLocations, namenode, client);
+
+ this.ecPolicy = ecPolicy;
+ }
+
+ @Override
+ void checksumBlocks() throws IOException {
+ int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
+ setTimeout(tmpTimeout);
+
+ for (bgIdx = 0;
+ bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) {
+ if (isRefetchBlocks()) { // refetch to get fresh tokens
+ refetchBlocks();
+ }
+
+ LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx);
+ LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
+
+ if (!checksumBlockGroup(blockGroup)) {
+ throw new IOException("Fail to get block MD5 for " + locatedBlock);
+ }
+ }
+ }
+
+
+ private boolean checksumBlockGroup(
+ LocatedStripedBlock blockGroup) throws IOException {
+ ExtendedBlock block = blockGroup.getBlock();
+ if (getRemaining() < block.getNumBytes()) {
+ block.setNumBytes(getRemaining());
+ }
+ setRemaining(getRemaining() - block.getNumBytes());
+
+ StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
+ blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
+ DatanodeInfo[] datanodes = blockGroup.getLocations();
+
+ //try each datanode in the block group.
+ boolean done = false;
+ for (int j = 0; !done && j < datanodes.length; j++) {
+ try {
+ tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
+ done = true;
+ } catch (InvalidBlockTokenException ibte) {
+ if (bgIdx > getLastRetriedIndex()) {
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ + "for file {} for block {} from datanode {}. Will retry "
+ + "the block once.",
+ getSrc(), block, datanodes[j]);
+ setLastRetriedIndex(bgIdx);
+ done = true; // actually it's not done; but we'll retry
+ bgIdx--; // repeat at bgIdx-th block
+ setRefetchBlocks(true);
+ }
+ } catch (IOException ie) {
+ LOG.warn("src={}" + ", datanodes[{}]={}",
+ getSrc(), j, datanodes[j], ie);
+ }
+ }
+
+ return done;
+ }
+
+ /**
+ * Return true when sounds good to continue or retry, false when severe
+ * condition or totally failed.
+ */
+ private void tryDatanode(LocatedStripedBlock blockGroup,
+ StripedBlockInfo stripedBlockInfo,
+ DatanodeInfo datanode) throws IOException {
+
+ try (IOStreamPair pair = getClient().connectToDN(datanode,
+ getTimeout(), blockGroup.getBlockToken())) {
+
+ LOG.debug("write to {}: {}, blockGroup={}",
+ datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
+
+ // get block MD5
+ createSender(pair).blockGroupChecksum(stripedBlockInfo,
+ blockGroup.getBlockToken());
+
+ BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(pair.in));
+
+ String logInfo = "for blockGroup " + blockGroup +
+ " from datanode " + datanode;
+ DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+ OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
+
+ //read byte-per-checksum
+ final int bpc = checksumData.getBytesPerCrc();
+ if (bgIdx == 0) { //first block
+ setBytesPerCRC(bpc);
+ } else {
+ if (bpc != getBytesPerCRC()) {
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ + " but bytesPerCRC=" + getBytesPerCRC());
+ }
+ }
+
+ //read crc-per-block
+ final long cpb = checksumData.getCrcPerBlock();
+ if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
+ setCrcPerBlock(cpb);
+ }
+
+ //read md5
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
+ md5.write(getMd5out());
+
+ // read crc-type
+ final DataChecksum.Type ct;
+ if (checksumData.hasCrcType()) {
+ ct = PBHelperClient.convert(checksumData.getCrcType());
+ } else {
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+ "inferring checksum by reading first byte");
+ ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
+ }
+
+ if (bgIdx == 0) {
+ setCrcType(ct);
+ } else if (getCrcType() != DataChecksum.Type.MIXED &&
+ getCrcType() != ct) {
+ // if crc types are mixed in a file
+ setCrcType(DataChecksum.Type.MIXED);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (bgIdx == 0) {
+ LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ + ", crcPerBlock=" + getCrcPerBlock());
+ }
+ LOG.debug("got reply from " + datanode + ": md5=" + md5);
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 4aa545b333..ad3f2ad90a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@@ -197,6 +198,17 @@ void copyBlock(final ExtendedBlock blk,
* @param blockToken security token for accessing the block.
* @throws IOException
*/
- void blockChecksum(final ExtendedBlock blk,
- final Token blockToken) throws IOException;
+ void blockChecksum(ExtendedBlock blk,
+ Token blockToken) throws IOException;
+
+
+ /**
+ * Get striped block group checksum (MD5 of CRC32).
+ *
+ * @param stripedBlockInfo a striped block info.
+ * @param blockToken security token for accessing the block.
+ * @throws IOException
+ */
+ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+ Token blockToken) throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
index 511574c414..94250e5e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -38,6 +38,7 @@ public enum Op {
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+ BLOCK_GROUP_CHECKSUM((byte)90),
CUSTOM((byte)127);
/** The code for this operation. */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 65456815ae..585ed99b1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -28,11 +28,13 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -261,4 +263,21 @@ public void blockChecksum(final ExtendedBlock blk,
send(out, Op.BLOCK_CHECKSUM, proto);
}
+
+ @Override
+ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
+ Token blockToken) throws IOException {
+ OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(
+ stripedBlockInfo.getBlock(), blockToken))
+ .setDatanodes(PBHelperClient.convertToProto(
+ stripedBlockInfo.getDatanodes()))
+ .addAllBlockTokens(PBHelperClient.convert(
+ stripedBlockInfo.getBlockTokens()))
+ .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
+ stripedBlockInfo.getErasureCodingPolicy()))
+ .build();
+
+ send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 38e875c012..4759373212 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -553,10 +553,8 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
List tokenProtos = proto.getBlockTokensList();
- Token[] blockTokens = new Token[indices.length];
- for (int i = 0; i < indices.length; i++) {
- blockTokens[i] = convert(tokenProtos.get(i));
- }
+ Token[] blockTokens =
+ convertTokens(tokenProtos);
((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
}
lb.setBlockToken(convert(proto.getBlockToken()));
@@ -564,6 +562,18 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
return lb;
}
+ static public Token[] convertTokens(
+ List tokenProtos) {
+
+ @SuppressWarnings("unchecked")
+ Token[] blockTokens = new Token[tokenProtos.size()];
+ for (int i = 0; i < blockTokens.length; i++) {
+ blockTokens[i] = convert(tokenProtos.get(i));
+ }
+
+ return blockTokens;
+ }
+
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
@@ -815,9 +825,7 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
byte[] indices = sb.getBlockIndices();
builder.setBlockIndices(PBHelperClient.getByteString(indices));
Token[] blockTokens = sb.getBlockTokens();
- for (int i = 0; i < indices.length; i++) {
- builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
- }
+ builder.addAllBlockTokens(convert(blockTokens));
}
return builder.setB(PBHelperClient.convert(b.getBlock()))
@@ -825,6 +833,16 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
+ public static List convert(
+ Token[] blockTokens) {
+ List results = new ArrayList<>(blockTokens.length);
+ for (Token bt : blockTokens) {
+ results.add(convert(bt));
+ }
+
+ return results;
+ }
+
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
List cList = proto.getCreationPolicy()
.getStorageTypesList();
@@ -2500,4 +2518,14 @@ public static ErasureCodingPolicyProto convertErasureCodingPolicy(
.setId(policy.getId());
return builder.build();
}
+
+ public static HdfsProtos.DatanodeInfosProto convertToProto(
+ DatanodeInfo[] datanodeInfos) {
+ HdfsProtos.DatanodeInfosProto.Builder builder =
+ HdfsProtos.DatanodeInfosProto.newBuilder();
+ for (DatanodeInfo datanodeInfo : datanodeInfos) {
+ builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
+ }
+ return builder.build();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 43772e2efb..0819376aac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -75,6 +75,18 @@ public class StripedBlockUtil {
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
+ /**
+ * Parses a striped block group into individual blocks.
+ * @param bg The striped block group
+ * @param ecPolicy The erasure coding policy
+ * @return An array of the blocks in the group
+ */
+ public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+ ErasureCodingPolicy ecPolicy) {
+ return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
+ ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+ }
+
/**
* This method parses a striped block group into individual blocks.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index a091d417d7..522ee06b68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -74,7 +74,6 @@ message OpReadBlockProto {
optional CachingStrategyProto cachingStrategy = 5;
}
-
message ChecksumProto {
required ChecksumTypeProto type = 1;
required uint32 bytesPerChecksum = 2;
@@ -149,6 +148,14 @@ message OpBlockChecksumProto {
required BaseHeaderProto header = 1;
}
+message OpBlockGroupChecksumProto {
+ required BaseHeaderProto header = 1;
+ required DatanodeInfosProto datanodes = 2;
+ // each internal block has a block token
+ repeated hadoop.common.TokenProto blockTokens = 3;
+ required ErasureCodingPolicyProto ecPolicy = 4;
+}
+
/**
* An ID uniquely identifying a shared memory segment.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index e0401574ce..b2f26f8d3c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -26,11 +26,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@@ -111,6 +113,9 @@ protected final void processOp(Op op) throws IOException {
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
+ case BLOCK_GROUP_CHECKSUM:
+ opStripedBlockChecksum(in);
+ break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
@@ -290,4 +295,27 @@ private void opBlockChecksum(DataInputStream in) throws IOException {
if (traceScope != null) traceScope.close();
}
}
+
+ /** Receive OP_STRIPED_BLOCK_CHECKSUM. */
+ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
+ OpBlockGroupChecksumProto proto =
+ OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis));
+ TraceScope traceScope = continueTraceSpan(proto.getHeader(),
+ proto.getClass().getSimpleName());
+ StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(
+ PBHelperClient.convert(proto.getHeader().getBlock()),
+ PBHelperClient.convert(proto.getDatanodes()),
+ PBHelperClient.convertTokens(proto.getBlockTokensList()),
+ PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
+ );
+
+ try {
+ blockGroupChecksum(stripedBlockInfo,
+ PBHelperClient.convert(proto.getHeader().getToken()));
+ } finally {
+ if (traceScope != null) {
+ traceScope.close();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 9a5552db3a..1f1a25c418 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -19,16 +19,30 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
@@ -41,13 +55,87 @@ final class BlockChecksumHelper {
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
- private BlockChecksumHelper() {}
+ private BlockChecksumHelper() {
+ }
/**
* The abstract base block checksum computer.
*/
- static abstract class BlockChecksumComputer {
+ static abstract class AbstractBlockChecksumComputer {
private final DataNode datanode;
+
+ private byte[] outBytes;
+ private int bytesPerCRC = -1;
+ private DataChecksum.Type crcType = null;
+ private long crcPerBlock = -1;
+ private int checksumSize = -1;
+
+ AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
+ this.datanode = datanode;
+ }
+
+ abstract void compute() throws IOException;
+
+ Sender createSender(IOStreamPair pair) {
+ DataOutputStream out = (DataOutputStream) pair.out;
+ return new Sender(out);
+ }
+
+ DataNode getDatanode() {
+ return datanode;
+ }
+
+ InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
+ throws IOException {
+ return datanode.data.getBlockInputStream(block, seekOffset);
+ }
+
+ void setOutBytes(byte[] bytes) {
+ this.outBytes = bytes;
+ }
+
+ byte[] getOutBytes() {
+ return outBytes;
+ }
+
+ int getBytesPerCRC() {
+ return bytesPerCRC;
+ }
+
+ public void setBytesPerCRC(int bytesPerCRC) {
+ this.bytesPerCRC = bytesPerCRC;
+ }
+
+ public void setCrcType(DataChecksum.Type crcType) {
+ this.crcType = crcType;
+ }
+
+ public void setCrcPerBlock(long crcPerBlock) {
+ this.crcPerBlock = crcPerBlock;
+ }
+
+ public void setChecksumSize(int checksumSize) {
+ this.checksumSize = checksumSize;
+ }
+
+ DataChecksum.Type getCrcType() {
+ return crcType;
+ }
+
+ long getCrcPerBlock() {
+ return crcPerBlock;
+ }
+
+ int getChecksumSize() {
+ return checksumSize;
+ }
+ }
+
+ /**
+ * The abstract base block checksum computer.
+ */
+ static abstract class BlockChecksumComputer
+ extends AbstractBlockChecksumComputer {
private final ExtendedBlock block;
// client side now can specify a range of the block for checksum
private final long requestLength;
@@ -56,17 +144,12 @@ static abstract class BlockChecksumComputer {
private final long visibleLength;
private final boolean partialBlk;
- private byte[] outBytes;
- private int bytesPerCRC = -1;
- private DataChecksum.Type crcType = null;
- private long crcPerBlock = -1;
- private int checksumSize = -1;
private BlockMetadataHeader header;
private DataChecksum checksum;
BlockChecksumComputer(DataNode datanode,
ExtendedBlock block) throws IOException {
- this.datanode = datanode;
+ super(datanode);
this.block = block;
this.requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
@@ -81,98 +164,80 @@ static abstract class BlockChecksumComputer {
new BufferedInputStream(metadataIn, ioFileBufferSize));
}
- protected DataNode getDatanode() {
- return datanode;
+ Sender createSender(IOStreamPair pair) {
+ DataOutputStream out = (DataOutputStream) pair.out;
+ return new Sender(out);
}
- protected ExtendedBlock getBlock() {
+
+ ExtendedBlock getBlock() {
return block;
}
- protected long getRequestLength() {
+ long getRequestLength() {
return requestLength;
}
- protected LengthInputStream getMetadataIn() {
+ LengthInputStream getMetadataIn() {
return metadataIn;
}
- protected DataInputStream getChecksumIn() {
+ DataInputStream getChecksumIn() {
return checksumIn;
}
- protected long getVisibleLength() {
+ long getVisibleLength() {
return visibleLength;
}
- protected boolean isPartialBlk() {
+ boolean isPartialBlk() {
return partialBlk;
}
- protected void setOutBytes(byte[] bytes) {
- this.outBytes = bytes;
- }
-
- protected byte[] getOutBytes() {
- return outBytes;
- }
-
- protected int getBytesPerCRC() {
- return bytesPerCRC;
- }
-
- protected DataChecksum.Type getCrcType() {
- return crcType;
- }
-
- protected long getCrcPerBlock() {
- return crcPerBlock;
- }
-
- protected int getChecksumSize() {
- return checksumSize;
- }
-
- protected BlockMetadataHeader getHeader() {
+ BlockMetadataHeader getHeader() {
return header;
}
- protected DataChecksum getChecksum() {
+ DataChecksum getChecksum() {
return checksum;
}
/**
* Perform the block checksum computing.
+ *
* @throws IOException
*/
abstract void compute() throws IOException;
/**
* Read block metadata header.
+ *
* @throws IOException
*/
- protected void readHeader() throws IOException {
+ void readHeader() throws IOException {
//read metadata file
header = BlockMetadataHeader.readHeader(checksumIn);
checksum = header.getChecksum();
- checksumSize = checksum.getChecksumSize();
- bytesPerCRC = checksum.getBytesPerChecksum();
- crcPerBlock = checksumSize <= 0 ? 0 :
+ setChecksumSize(checksum.getChecksumSize());
+ setBytesPerCRC(checksum.getBytesPerChecksum());
+ long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 :
(metadataIn.getLength() -
- BlockMetadataHeader.getHeaderSize()) / checksumSize;
- crcType = checksum.getChecksumType();
+ BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize();
+ setCrcPerBlock(crcPerBlock);
+ setCrcType(checksum.getChecksumType());
}
/**
* Calculate partial block checksum.
+ *
* @return
* @throws IOException
*/
- protected byte[] crcPartialBlock() throws IOException {
- int partialLength = (int) (requestLength % bytesPerCRC);
+ byte[] crcPartialBlock() throws IOException {
+ int partialLength = (int) (requestLength % getBytesPerCRC());
if (partialLength > 0) {
byte[] buf = new byte[partialLength];
- final InputStream blockIn = datanode.data.getBlockInputStream(block,
+ final InputStream blockIn = getBlockInputStream(block,
requestLength - partialLength);
try {
// Get the CRC of the partialLength.
@@ -181,7 +246,7 @@ protected byte[] crcPartialBlock() throws IOException {
IOUtils.closeStream(blockIn);
}
checksum.update(buf, 0, partialLength);
- byte[] partialCrc = new byte[checksumSize];
+ byte[] partialCrc = new byte[getChecksumSize()];
checksum.writeValue(partialCrc, 0, true);
return partialCrc;
}
@@ -229,7 +294,7 @@ private MD5Hash checksumWholeBlock() throws IOException {
}
private MD5Hash checksumPartialBlock() throws IOException {
- byte[] buffer = new byte[4*1024];
+ byte[] buffer = new byte[4 * 1024];
MessageDigest digester = MD5Hash.getDigester();
long remaining = (getRequestLength() / getBytesPerCRC())
@@ -251,4 +316,115 @@ private MD5Hash checksumPartialBlock() throws IOException {
return new MD5Hash(digester.digest());
}
}
-}
+
+ /**
+ * Non-striped block group checksum computer for striped blocks.
+ */
+ static class BlockGroupNonStripedChecksumComputer
+ extends AbstractBlockChecksumComputer {
+
+ private final ExtendedBlock blockGroup;
+ private final ErasureCodingPolicy ecPolicy;
+ private final DatanodeInfo[] datanodes;
+ private final Token[] blockTokens;
+
+ private final DataOutputBuffer md5writer = new DataOutputBuffer();
+
+ BlockGroupNonStripedChecksumComputer(DataNode datanode,
+ StripedBlockInfo stripedBlockInfo)
+ throws IOException {
+ super(datanode);
+ this.blockGroup = stripedBlockInfo.getBlock();
+ this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
+ this.datanodes = stripedBlockInfo.getDatanodes();
+ this.blockTokens = stripedBlockInfo.getBlockTokens();
+ }
+
+ @Override
+ void compute() throws IOException {
+ for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
+ ExtendedBlock block =
+ StripedBlockUtil.constructInternalBlock(blockGroup,
+ ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
+ DatanodeInfo targetDatanode = datanodes[idx];
+ Token blockToken = blockTokens[idx];
+ checksumBlock(block, idx, blockToken, targetDatanode);
+ }
+
+ MD5Hash md5out = MD5Hash.digest(md5writer.getData());
+ setOutBytes(md5out.getDigest());
+ }
+
+ private void checksumBlock(ExtendedBlock block, int blockIdx,
+ Token blockToken,
+ DatanodeInfo targetDatanode) throws IOException {
+ int timeout = 3000;
+ try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
+ timeout, block, blockToken)) {
+
+ LOG.debug("write to {}: {}, block={}",
+ getDatanode(), Op.BLOCK_CHECKSUM, block);
+
+ // get block MD5
+ createSender(pair).blockChecksum(block, blockToken);
+
+ final DataTransferProtos.BlockOpResponseProto reply =
+ DataTransferProtos.BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(pair.in));
+
+ String logInfo = "for block " + block
+ + " from datanode " + targetDatanode;
+ DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+ DataTransferProtos.OpBlockChecksumResponseProto checksumData =
+ reply.getChecksumResponse();
+
+ //read byte-per-checksum
+ final int bpc = checksumData.getBytesPerCrc();
+ if (blockIdx == 0) { //first block
+ setBytesPerCRC(bpc);
+ } else if (bpc != getBytesPerCRC()) {
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ + " but bytesPerCRC=" + getBytesPerCRC());
+ }
+
+ //read crc-per-block
+ final long cpb = checksumData.getCrcPerBlock();
+ if (blockIdx == 0) {
+ setCrcPerBlock(cpb);
+ }
+
+ //read md5
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
+ md5.write(md5writer);
+
+ // read crc-type
+ final DataChecksum.Type ct;
+ if (checksumData.hasCrcType()) {
+ ct = PBHelperClient.convert(checksumData.getCrcType());
+ } else {
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+ "inferring checksum by reading first byte");
+ ct = DataChecksum.Type.DEFAULT;
+ }
+
+ if (blockIdx == 0) { // first block
+ setCrcType(ct);
+ } else if (getCrcType() != DataChecksum.Type.MIXED &&
+ getCrcType() != ct) {
+ // if crc types are mixed in a file
+ setCrcType(DataChecksum.Type.MIXED);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (blockIdx == 0) {
+ LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ + ", crcPerBlock=" + getCrcPerBlock());
+ }
+ LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 1d4a79ac88..63bf5ae536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -46,7 +47,9 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
@@ -923,6 +926,46 @@ public void blockChecksum(ExtendedBlock block,
datanode.metrics.addBlockChecksumOp(elapsed());
}
+ @Override
+ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
+ final Token blockToken)
+ throws IOException {
+ updateCurrentThreadName("Getting checksum for block group" +
+ stripedBlockInfo.getBlock());
+ final DataOutputStream out = new DataOutputStream(getOutputStream());
+ checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
+ Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
+
+ AbstractBlockChecksumComputer maker =
+ new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
+
+ try {
+ maker.compute();
+
+ //write reply
+ BlockOpResponseProto.newBuilder()
+ .setStatus(SUCCESS)
+ .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+ .setBytesPerCrc(maker.getBytesPerCRC())
+ .setCrcPerBlock(maker.getCrcPerBlock())
+ .setMd5(ByteString.copyFrom(maker.getOutBytes()))
+ .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+ .build()
+ .writeDelimitedTo(out);
+ out.flush();
+ } catch (IOException ioe) {
+ LOG.info("blockChecksum " + stripedBlockInfo.getBlock() +
+ " received exception " + ioe);
+ incrDatanodeNetworkErrors();
+ throw ioe;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ //update metrics
+ datanode.metrics.addBlockChecksumOp(elapsed());
+ }
+
@Override
public void copyBlock(final ExtendedBlock block,
final Token blockToken) throws IOException {