diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 2c3329e47a..9d6ab9a102 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -8,6 +8,7 @@ + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3506d3ac70..88bd21909d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1704,7 +1704,10 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException { /** * Get the checksum of the whole file or a range of the file. Note that the - * range always starts from the beginning of the file. + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. It can be used to checksum and compare + * two replicated files, or two striped files, but not applicable for two + * files of different block layout forms. * @param src The file path * @param length the length of the range, i.e., the range is [0, length] * @return The checksum @@ -1717,7 +1720,11 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) LocatedBlocks blockLocations = getBlockLocations(src, length); - FileChecksumHelper.FileChecksumComputer maker = + FileChecksumHelper.FileChecksumComputer maker; + ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy(); + maker = ecPolicy != null ? + new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src, + length, blockLocations, namenode, this, ecPolicy) : new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, blockLocations, namenode, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index d15db9f11c..dfd939397b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -22,10 +22,13 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; @@ -75,7 +78,7 @@ static abstract class FileChecksumComputer { private int bytesPerCRC = -1; private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; private long crcPerBlock = 0; - private boolean refetchBlocks = false; + private boolean isRefetchBlocks = false; private int lastRetriedIndex = -1; /** @@ -127,8 +130,11 @@ LocatedBlocks getBlockLocations() { return blockLocations; } - void setBlockLocations(LocatedBlocks blockLocations) { - this.blockLocations = blockLocations; + void refetchBlocks() throws IOException { + this.blockLocations = getClient().getBlockLocations(getSrc(), + getLength()); + this.locatedBlocks = getBlockLocations().getLocatedBlocks(); + this.isRefetchBlocks = false; } int getTimeout() { @@ -143,10 +149,6 @@ List getLocatedBlocks() { return locatedBlocks; } - void setLocatedBlocks(List locatedBlocks) { - this.locatedBlocks = locatedBlocks; - } - long getRemaining() { return remaining; } @@ -180,11 +182,11 @@ void setCrcPerBlock(long crcPerBlock) { } boolean isRefetchBlocks() { - return refetchBlocks; + return isRefetchBlocks; } void setRefetchBlocks(boolean refetchBlocks) { - this.refetchBlocks = refetchBlocks; + this.isRefetchBlocks = refetchBlocks; } int getLastRetriedIndex() { @@ -278,10 +280,7 @@ void checksumBlocks() throws IOException { blockIdx < getLocatedBlocks().size() && getRemaining() >= 0; blockIdx++) { if (isRefetchBlocks()) { // refetch to get fresh tokens - setBlockLocations(getClient().getBlockLocations(getSrc(), - getLength())); - setLocatedBlocks(getBlockLocations().getLocatedBlocks()); - setRefetchBlocks(false); + refetchBlocks(); } LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); @@ -380,15 +379,13 @@ private void tryDatanode(LocatedBlock locatedBlock, } //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); + final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); md5.write(getMd5out()); // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData - .getCrcType()); + ct = PBHelperClient.convert(checksumData.getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + "inferring checksum by reading first byte"); @@ -413,4 +410,160 @@ && getCrcType() != ct) { } } } + + /** + * Striped file checksum computing. + */ + static class StripedFileNonStripedChecksumComputer + extends FileChecksumComputer { + private final ErasureCodingPolicy ecPolicy; + private int bgIdx; + + StripedFileNonStripedChecksumComputer(String src, long length, + LocatedBlocks blockLocations, + ClientProtocol namenode, + DFSClient client, + ErasureCodingPolicy ecPolicy) + throws IOException { + super(src, length, blockLocations, namenode, client); + + this.ecPolicy = ecPolicy; + } + + @Override + void checksumBlocks() throws IOException { + int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout(); + setTimeout(tmpTimeout); + + for (bgIdx = 0; + bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) { + if (isRefetchBlocks()) { // refetch to get fresh tokens + refetchBlocks(); + } + + LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx); + LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; + + if (!checksumBlockGroup(blockGroup)) { + throw new IOException("Fail to get block MD5 for " + locatedBlock); + } + } + } + + + private boolean checksumBlockGroup( + LocatedStripedBlock blockGroup) throws IOException { + ExtendedBlock block = blockGroup.getBlock(); + if (getRemaining() < block.getNumBytes()) { + block.setNumBytes(getRemaining()); + } + setRemaining(getRemaining() - block.getNumBytes()); + + StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, + blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy); + DatanodeInfo[] datanodes = blockGroup.getLocations(); + + //try each datanode in the block group. + boolean done = false; + for (int j = 0; !done && j < datanodes.length; j++) { + try { + tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]); + done = true; + } catch (InvalidBlockTokenException ibte) { + if (bgIdx > getLastRetriedIndex()) { + LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + + "for file {} for block {} from datanode {}. Will retry " + + "the block once.", + getSrc(), block, datanodes[j]); + setLastRetriedIndex(bgIdx); + done = true; // actually it's not done; but we'll retry + bgIdx--; // repeat at bgIdx-th block + setRefetchBlocks(true); + } + } catch (IOException ie) { + LOG.warn("src={}" + ", datanodes[{}]={}", + getSrc(), j, datanodes[j], ie); + } + } + + return done; + } + + /** + * Return true when sounds good to continue or retry, false when severe + * condition or totally failed. + */ + private void tryDatanode(LocatedStripedBlock blockGroup, + StripedBlockInfo stripedBlockInfo, + DatanodeInfo datanode) throws IOException { + + try (IOStreamPair pair = getClient().connectToDN(datanode, + getTimeout(), blockGroup.getBlockToken())) { + + LOG.debug("write to {}: {}, blockGroup={}", + datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); + + // get block MD5 + createSender(pair).blockGroupChecksum(stripedBlockInfo, + blockGroup.getBlockToken()); + + BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(pair.in)); + + String logInfo = "for blockGroup " + blockGroup + + " from datanode " + datanode; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (bgIdx == 0) { //first block + setBytesPerCRC(bpc); + } else { + if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block + setCrcPerBlock(cpb); + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(getMd5out()); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = getClient().inferChecksumTypeByReading(blockGroup, datanode); + } + + if (bgIdx == 0) { + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (bgIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + LOG.debug("got reply from " + datanode + ": md5=" + md5); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java new file mode 100644 index 0000000000..74e80810d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +/** + * Striped block info that can be sent elsewhere to do block group level things, + * like checksum, and etc. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class StripedBlockInfo { + private final ExtendedBlock block; + private final DatanodeInfo[] datanodes; + private final Token[] blockTokens; + private final ErasureCodingPolicy ecPolicy; + + public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes, + Token[] blockTokens, + ErasureCodingPolicy ecPolicy) { + this.block = block; + this.datanodes = datanodes; + this.blockTokens = blockTokens; + this.ecPolicy = ecPolicy; + } + + public ExtendedBlock getBlock() { + return block; + } + + public DatanodeInfo[] getDatanodes() { + return datanodes; + } + + public Token[] getBlockTokens() { + return blockTokens; + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 4aa545b333..ad3f2ad90a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; @@ -197,6 +198,17 @@ void copyBlock(final ExtendedBlock blk, * @param blockToken security token for accessing the block. * @throws IOException */ - void blockChecksum(final ExtendedBlock blk, - final Token blockToken) throws IOException; + void blockChecksum(ExtendedBlock blk, + Token blockToken) throws IOException; + + + /** + * Get striped block group checksum (MD5 of CRC32). + * + * @param stripedBlockInfo a striped block info. + * @param blockToken security token for accessing the block. + * @throws IOException + */ + void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, + Token blockToken) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 511574c414..94250e5e7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -38,6 +38,7 @@ public enum Op { REQUEST_SHORT_CIRCUIT_FDS((byte)87), RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), + BLOCK_GROUP_CHECKSUM((byte)90), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 65456815ae..585ed99b1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -28,11 +28,13 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -261,4 +263,21 @@ public void blockChecksum(final ExtendedBlock blk, send(out, Op.BLOCK_CHECKSUM, proto); } + + @Override + public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, + Token blockToken) throws IOException { + OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader( + stripedBlockInfo.getBlock(), blockToken)) + .setDatanodes(PBHelperClient.convertToProto( + stripedBlockInfo.getDatanodes())) + .addAllBlockTokens(PBHelperClient.convert( + stripedBlockInfo.getBlockTokens())) + .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( + stripedBlockInfo.getErasureCodingPolicy())) + .build(); + + send(out, Op.BLOCK_GROUP_CHECKSUM, proto); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 38e875c012..4759373212 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -553,10 +553,8 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); List tokenProtos = proto.getBlockTokensList(); - Token[] blockTokens = new Token[indices.length]; - for (int i = 0; i < indices.length; i++) { - blockTokens[i] = convert(tokenProtos.get(i)); - } + Token[] blockTokens = + convertTokens(tokenProtos); ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); } lb.setBlockToken(convert(proto.getBlockToken())); @@ -564,6 +562,18 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { return lb; } + static public Token[] convertTokens( + List tokenProtos) { + + @SuppressWarnings("unchecked") + Token[] blockTokens = new Token[tokenProtos.size()]; + for (int i = 0; i < blockTokens.length; i++) { + blockTokens[i] = convert(tokenProtos.get(i)); + } + + return blockTokens; + } + static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -815,9 +825,7 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { byte[] indices = sb.getBlockIndices(); builder.setBlockIndices(PBHelperClient.getByteString(indices)); Token[] blockTokens = sb.getBlockTokens(); - for (int i = 0; i < indices.length; i++) { - builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); - } + builder.addAllBlockTokens(convert(blockTokens)); } return builder.setB(PBHelperClient.convert(b.getBlock())) @@ -825,6 +833,16 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } + public static List convert( + Token[] blockTokens) { + List results = new ArrayList<>(blockTokens.length); + for (Token bt : blockTokens) { + results.add(convert(bt)); + } + + return results; + } + public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { List cList = proto.getCreationPolicy() .getStorageTypesList(); @@ -2500,4 +2518,14 @@ public static ErasureCodingPolicyProto convertErasureCodingPolicy( .setId(policy.getId()); return builder.build(); } + + public static HdfsProtos.DatanodeInfosProto convertToProto( + DatanodeInfo[] datanodeInfos) { + HdfsProtos.DatanodeInfosProto.Builder builder = + HdfsProtos.DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : datanodeInfos) { + builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); + } + return builder.build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 43772e2efb..0819376aac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -75,6 +75,18 @@ public class StripedBlockUtil { public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); + /** + * Parses a striped block group into individual blocks. + * @param bg The striped block group + * @param ecPolicy The erasure coding policy + * @return An array of the blocks in the group + */ + public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, + ErasureCodingPolicy ecPolicy) { + return parseStripedBlockGroup(bg, ecPolicy.getCellSize(), + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); + } + /** * This method parses a striped block group into individual blocks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index a091d417d7..522ee06b68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -74,7 +74,6 @@ message OpReadBlockProto { optional CachingStrategyProto cachingStrategy = 5; } - message ChecksumProto { required ChecksumTypeProto type = 1; required uint32 bytesPerChecksum = 2; @@ -149,6 +148,14 @@ message OpBlockChecksumProto { required BaseHeaderProto header = 1; } +message OpBlockGroupChecksumProto { + required BaseHeaderProto header = 1; + required DatanodeInfosProto datanodes = 2; + // each internal block has a block token + repeated hadoop.common.TokenProto blockTokens = 3; + required ErasureCodingPolicyProto ecPolicy = 4; +} + /** * An ID uniquely identifying a shared memory segment. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index e0401574ce..b2f26f8d3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -26,11 +26,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -111,6 +113,9 @@ protected final void processOp(Op op) throws IOException { case BLOCK_CHECKSUM: opBlockChecksum(in); break; + case BLOCK_GROUP_CHECKSUM: + opStripedBlockChecksum(in); + break; case TRANSFER_BLOCK: opTransferBlock(in); break; @@ -290,4 +295,27 @@ private void opBlockChecksum(DataInputStream in) throws IOException { if (traceScope != null) traceScope.close(); } } + + /** Receive OP_STRIPED_BLOCK_CHECKSUM. */ + private void opStripedBlockChecksum(DataInputStream dis) throws IOException { + OpBlockGroupChecksumProto proto = + OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + StripedBlockInfo stripedBlockInfo = new StripedBlockInfo( + PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getDatanodes()), + PBHelperClient.convertTokens(proto.getBlockTokensList()), + PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy()) + ); + + try { + blockGroupChecksum(stripedBlockInfo, + PBHelperClient.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) { + traceScope.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index 9a5552db3a..1f1a25c418 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -19,16 +19,30 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Op; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; @@ -41,13 +55,87 @@ final class BlockChecksumHelper { static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); - private BlockChecksumHelper() {} + private BlockChecksumHelper() { + } /** * The abstract base block checksum computer. */ - static abstract class BlockChecksumComputer { + static abstract class AbstractBlockChecksumComputer { private final DataNode datanode; + + private byte[] outBytes; + private int bytesPerCRC = -1; + private DataChecksum.Type crcType = null; + private long crcPerBlock = -1; + private int checksumSize = -1; + + AbstractBlockChecksumComputer(DataNode datanode) throws IOException { + this.datanode = datanode; + } + + abstract void compute() throws IOException; + + Sender createSender(IOStreamPair pair) { + DataOutputStream out = (DataOutputStream) pair.out; + return new Sender(out); + } + + DataNode getDatanode() { + return datanode; + } + + InputStream getBlockInputStream(ExtendedBlock block, long seekOffset) + throws IOException { + return datanode.data.getBlockInputStream(block, seekOffset); + } + + void setOutBytes(byte[] bytes) { + this.outBytes = bytes; + } + + byte[] getOutBytes() { + return outBytes; + } + + int getBytesPerCRC() { + return bytesPerCRC; + } + + public void setBytesPerCRC(int bytesPerCRC) { + this.bytesPerCRC = bytesPerCRC; + } + + public void setCrcType(DataChecksum.Type crcType) { + this.crcType = crcType; + } + + public void setCrcPerBlock(long crcPerBlock) { + this.crcPerBlock = crcPerBlock; + } + + public void setChecksumSize(int checksumSize) { + this.checksumSize = checksumSize; + } + + DataChecksum.Type getCrcType() { + return crcType; + } + + long getCrcPerBlock() { + return crcPerBlock; + } + + int getChecksumSize() { + return checksumSize; + } + } + + /** + * The abstract base block checksum computer. + */ + static abstract class BlockChecksumComputer + extends AbstractBlockChecksumComputer { private final ExtendedBlock block; // client side now can specify a range of the block for checksum private final long requestLength; @@ -56,17 +144,12 @@ static abstract class BlockChecksumComputer { private final long visibleLength; private final boolean partialBlk; - private byte[] outBytes; - private int bytesPerCRC = -1; - private DataChecksum.Type crcType = null; - private long crcPerBlock = -1; - private int checksumSize = -1; private BlockMetadataHeader header; private DataChecksum checksum; BlockChecksumComputer(DataNode datanode, ExtendedBlock block) throws IOException { - this.datanode = datanode; + super(datanode); this.block = block; this.requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); @@ -81,98 +164,80 @@ static abstract class BlockChecksumComputer { new BufferedInputStream(metadataIn, ioFileBufferSize)); } - protected DataNode getDatanode() { - return datanode; + Sender createSender(IOStreamPair pair) { + DataOutputStream out = (DataOutputStream) pair.out; + return new Sender(out); } - protected ExtendedBlock getBlock() { + + ExtendedBlock getBlock() { return block; } - protected long getRequestLength() { + long getRequestLength() { return requestLength; } - protected LengthInputStream getMetadataIn() { + LengthInputStream getMetadataIn() { return metadataIn; } - protected DataInputStream getChecksumIn() { + DataInputStream getChecksumIn() { return checksumIn; } - protected long getVisibleLength() { + long getVisibleLength() { return visibleLength; } - protected boolean isPartialBlk() { + boolean isPartialBlk() { return partialBlk; } - protected void setOutBytes(byte[] bytes) { - this.outBytes = bytes; - } - - protected byte[] getOutBytes() { - return outBytes; - } - - protected int getBytesPerCRC() { - return bytesPerCRC; - } - - protected DataChecksum.Type getCrcType() { - return crcType; - } - - protected long getCrcPerBlock() { - return crcPerBlock; - } - - protected int getChecksumSize() { - return checksumSize; - } - - protected BlockMetadataHeader getHeader() { + BlockMetadataHeader getHeader() { return header; } - protected DataChecksum getChecksum() { + DataChecksum getChecksum() { return checksum; } /** * Perform the block checksum computing. + * * @throws IOException */ abstract void compute() throws IOException; /** * Read block metadata header. + * * @throws IOException */ - protected void readHeader() throws IOException { + void readHeader() throws IOException { //read metadata file header = BlockMetadataHeader.readHeader(checksumIn); checksum = header.getChecksum(); - checksumSize = checksum.getChecksumSize(); - bytesPerCRC = checksum.getBytesPerChecksum(); - crcPerBlock = checksumSize <= 0 ? 0 : + setChecksumSize(checksum.getChecksumSize()); + setBytesPerCRC(checksum.getBytesPerChecksum()); + long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 : (metadataIn.getLength() - - BlockMetadataHeader.getHeaderSize()) / checksumSize; - crcType = checksum.getChecksumType(); + BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize(); + setCrcPerBlock(crcPerBlock); + setCrcType(checksum.getChecksumType()); } /** * Calculate partial block checksum. + * * @return * @throws IOException */ - protected byte[] crcPartialBlock() throws IOException { - int partialLength = (int) (requestLength % bytesPerCRC); + byte[] crcPartialBlock() throws IOException { + int partialLength = (int) (requestLength % getBytesPerCRC()); if (partialLength > 0) { byte[] buf = new byte[partialLength]; - final InputStream blockIn = datanode.data.getBlockInputStream(block, + final InputStream blockIn = getBlockInputStream(block, requestLength - partialLength); try { // Get the CRC of the partialLength. @@ -181,7 +246,7 @@ protected byte[] crcPartialBlock() throws IOException { IOUtils.closeStream(blockIn); } checksum.update(buf, 0, partialLength); - byte[] partialCrc = new byte[checksumSize]; + byte[] partialCrc = new byte[getChecksumSize()]; checksum.writeValue(partialCrc, 0, true); return partialCrc; } @@ -229,7 +294,7 @@ private MD5Hash checksumWholeBlock() throws IOException { } private MD5Hash checksumPartialBlock() throws IOException { - byte[] buffer = new byte[4*1024]; + byte[] buffer = new byte[4 * 1024]; MessageDigest digester = MD5Hash.getDigester(); long remaining = (getRequestLength() / getBytesPerCRC()) @@ -251,4 +316,115 @@ private MD5Hash checksumPartialBlock() throws IOException { return new MD5Hash(digester.digest()); } } -} + + /** + * Non-striped block group checksum computer for striped blocks. + */ + static class BlockGroupNonStripedChecksumComputer + extends AbstractBlockChecksumComputer { + + private final ExtendedBlock blockGroup; + private final ErasureCodingPolicy ecPolicy; + private final DatanodeInfo[] datanodes; + private final Token[] blockTokens; + + private final DataOutputBuffer md5writer = new DataOutputBuffer(); + + BlockGroupNonStripedChecksumComputer(DataNode datanode, + StripedBlockInfo stripedBlockInfo) + throws IOException { + super(datanode); + this.blockGroup = stripedBlockInfo.getBlock(); + this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); + this.datanodes = stripedBlockInfo.getDatanodes(); + this.blockTokens = stripedBlockInfo.getBlockTokens(); + } + + @Override + void compute() throws IOException { + for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) { + ExtendedBlock block = + StripedBlockUtil.constructInternalBlock(blockGroup, + ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx); + DatanodeInfo targetDatanode = datanodes[idx]; + Token blockToken = blockTokens[idx]; + checksumBlock(block, idx, blockToken, targetDatanode); + } + + MD5Hash md5out = MD5Hash.digest(md5writer.getData()); + setOutBytes(md5out.getDigest()); + } + + private void checksumBlock(ExtendedBlock block, int blockIdx, + Token blockToken, + DatanodeInfo targetDatanode) throws IOException { + int timeout = 3000; + try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode, + timeout, block, blockToken)) { + + LOG.debug("write to {}: {}, block={}", + getDatanode(), Op.BLOCK_CHECKSUM, block); + + // get block MD5 + createSender(pair).blockChecksum(block, blockToken); + + final DataTransferProtos.BlockOpResponseProto reply = + DataTransferProtos.BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(pair.in)); + + String logInfo = "for block " + block + + " from datanode " + targetDatanode; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + DataTransferProtos.OpBlockChecksumResponseProto checksumData = + reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (blockIdx == 0) { + setCrcPerBlock(cpb); + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(md5writer); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = DataChecksum.Type.DEFAULT; + } + + if (blockIdx == 0) { // first block + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); + } + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 1d4a79ac88..63bf5ae536 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; @@ -46,7 +47,9 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; @@ -923,6 +926,46 @@ public void blockChecksum(ExtendedBlock block, datanode.metrics.addBlockChecksumOp(elapsed()); } + @Override + public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, + final Token blockToken) + throws IOException { + updateCurrentThreadName("Getting checksum for block group" + + stripedBlockInfo.getBlock()); + final DataOutputStream out = new DataOutputStream(getOutputStream()); + checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken, + Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); + + AbstractBlockChecksumComputer maker = + new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo); + + try { + maker.compute(); + + //write reply + BlockOpResponseProto.newBuilder() + .setStatus(SUCCESS) + .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() + .setBytesPerCrc(maker.getBytesPerCRC()) + .setCrcPerBlock(maker.getCrcPerBlock()) + .setMd5(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .build() + .writeDelimitedTo(out); + out.flush(); + } catch (IOException ioe) { + LOG.info("blockChecksum " + stripedBlockInfo.getBlock() + + " received exception " + ioe); + incrDatanodeNetworkErrors(); + throw ioe; + } finally { + IOUtils.closeStream(out); + } + + //update metrics + datanode.metrics.addBlockChecksumOp(elapsed()); + } + @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java new file mode 100644 index 0000000000..7cee344014 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +/** + * This test serves a prototype to demo the idea proposed so far. It creates two + * files using the same data, one is in replica mode, the other is in stripped + * layout. For simple, it assumes 6 data blocks in both files and the block size + * are the same. + */ +public class TestFileChecksum { + public static final Log LOG = LogFactory.getLog(TestFileChecksum.class); + + private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; + private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS; + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private Configuration conf; + private DFSClient client; + + private int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; + private int stripesPerBlock = 6; + private int blockSize = cellSize * stripesPerBlock; + private int numBlockGroups = 10; + private int stripSize = cellSize * dataBlocks; + private int blockGroupSize = stripesPerBlock * stripSize; + private int fileSize = numBlockGroups * blockGroupSize; + + private String ecDir = "/striped"; + private String stripedFile1 = ecDir + "/stripedFileChecksum1"; + private String stripedFile2 = ecDir + "/stripedFileChecksum2"; + private String replicatedFile = "/replicatedFileChecksum"; + + @Before + public void setup() throws IOException { + int numDNs = dataBlocks + parityBlocks + 2; + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + Path ecPath = new Path(ecDir); + cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault()); + cluster.getFileSystem().getClient().setErasureCodingPolicy(ecDir, null); + fs = cluster.getFileSystem(); + client = fs.getClient(); + + prepareTestFiles(); + + getDataNodeToKill(stripedFile1); + getDataNodeToKill(replicatedFile); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testStripedFileChecksum1() throws Exception { + int length = 0; + testStripedFileChecksum(length, length + 10); + } + + @Test + public void testStripedFileChecksum2() throws Exception { + int length = stripSize - 1; + testStripedFileChecksum(length, length - 10); + } + + @Test + public void testStripedFileChecksum3() throws Exception { + int length = stripSize; + testStripedFileChecksum(length, length - 10); + } + + @Test + public void testStripedFileChecksum4() throws Exception { + int length = stripSize + cellSize * 2; + testStripedFileChecksum(length, length - 10); + } + + @Test + public void testStripedFileChecksum5() throws Exception { + int length = blockGroupSize; + testStripedFileChecksum(length, length - 10); + } + + @Test + public void testStripedFileChecksum6() throws Exception { + int length = blockGroupSize + blockSize; + testStripedFileChecksum(length, length - 10); + } + + @Test + public void testStripedFileChecksum7() throws Exception { + int length = -1; // whole file + testStripedFileChecksum(length, fileSize); + } + + void testStripedFileChecksum(int range1, int range2) throws Exception { + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, + range1, false); + FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, + range1, false); + FileChecksum stripedFileChecksum3 = getFileChecksum(stripedFile2, + range2, false); + + LOG.info("stripedFileChecksum1:" + stripedFileChecksum1); + LOG.info("stripedFileChecksum2:" + stripedFileChecksum2); + LOG.info("stripedFileChecksum3:" + stripedFileChecksum3); + + Assert.assertTrue(stripedFileChecksum1.equals(stripedFileChecksum2)); + if (range1 >=0 && range1 != range2) { + Assert.assertFalse(stripedFileChecksum1.equals(stripedFileChecksum3)); + } + } + + @Test + public void testStripedAndReplicatedFileChecksum() throws Exception { + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, + 10, false); + FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, + 10, false); + + Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); + } + + /* + // TODO: allow datanode failure, HDFS-9833 + @Test + public void testStripedAndReplicatedWithFailure() throws Exception { + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, + 10, true); + FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, + 10, true); + + Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); + }*/ + + private FileChecksum getFileChecksum(String filePath, int range, + boolean killDn) throws Exception { + int dnIdxToDie = -1; + if (killDn) { + dnIdxToDie = getDataNodeToKill(filePath); + DataNode dnToDie = cluster.getDataNodes().get(dnIdxToDie); + shutdownDataNode(dnToDie); + } + + Path testPath = new Path(filePath); + FileChecksum fc; + + if (range >= 0) { + fc = fs.getFileChecksum(testPath, range); + } else { + fc = fs.getFileChecksum(testPath); + } + + if (dnIdxToDie != -1) { + cluster.restartDataNode(dnIdxToDie, true); + } + + return fc; + } + + void prepareTestFiles() throws IOException { + byte[] fileData = StripedFileTestUtil.generateBytes(fileSize); + + String[] filePaths = new String[] { + stripedFile1, stripedFile2, replicatedFile + }; + + for (String filePath : filePaths) { + Path testPath = new Path(filePath); + DFSTestUtil.writeFile(fs, testPath, fileData); + } + } + + void shutdownDataNode(DataNode dataNode) throws IOException { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + dataNode.shutdown(); + cluster.setDataNodeDead(dataNode.getDatanodeId()); + } + + /** + * Determine the datanode that hosts the first block of the file. For simple + * this just returns the first datanode as it's firstly tried. + */ + int getDataNodeToKill(String filePath) throws IOException { + LocatedBlocks locatedBlocks = client.getLocatedBlocks(filePath, 0); + + LocatedBlock locatedBlock = locatedBlocks.get(0); + DatanodeInfo[] datanodes = locatedBlock.getLocations(); + DatanodeInfo chosenDn = datanodes[0]; + + int idx = 0; + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getInfoPort() == chosenDn.getInfoPort()) { + return idx; + } + idx++; + } + + return -1; + } +}