diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 9d6ab9a102..2c3329e47a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -8,7 +8,6 @@ - diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 88bd21909d..3506d3ac70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1704,10 +1704,7 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException { /** * Get the checksum of the whole file or a range of the file. Note that the - * range always starts from the beginning of the file. The file can be - * in replicated form, or striped mode. It can be used to checksum and compare - * two replicated files, or two striped files, but not applicable for two - * files of different block layout forms. + * range always starts from the beginning of the file. * @param src The file path * @param length the length of the range, i.e., the range is [0, length] * @return The checksum @@ -1720,11 +1717,7 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) LocatedBlocks blockLocations = getBlockLocations(src, length); - FileChecksumHelper.FileChecksumComputer maker; - ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy(); - maker = ecPolicy != null ? - new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src, - length, blockLocations, namenode, this, ecPolicy) : + FileChecksumHelper.FileChecksumComputer maker = new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, blockLocations, namenode, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index dfd939397b..d15db9f11c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -22,13 +22,10 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; @@ -78,7 +75,7 @@ static abstract class FileChecksumComputer { private int bytesPerCRC = -1; private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; private long crcPerBlock = 0; - private boolean isRefetchBlocks = false; + private boolean refetchBlocks = false; private int lastRetriedIndex = -1; /** @@ -130,11 +127,8 @@ LocatedBlocks getBlockLocations() { return blockLocations; } - void refetchBlocks() throws IOException { - this.blockLocations = getClient().getBlockLocations(getSrc(), - getLength()); - this.locatedBlocks = getBlockLocations().getLocatedBlocks(); - this.isRefetchBlocks = false; + void setBlockLocations(LocatedBlocks blockLocations) { + this.blockLocations = blockLocations; } int getTimeout() { @@ -149,6 +143,10 @@ List getLocatedBlocks() { return locatedBlocks; } + void setLocatedBlocks(List locatedBlocks) { + this.locatedBlocks = locatedBlocks; + } + long getRemaining() { return remaining; } @@ -182,11 +180,11 @@ void setCrcPerBlock(long crcPerBlock) { } boolean isRefetchBlocks() { - return isRefetchBlocks; + return refetchBlocks; } void setRefetchBlocks(boolean refetchBlocks) { - this.isRefetchBlocks = refetchBlocks; + this.refetchBlocks = refetchBlocks; } int getLastRetriedIndex() { @@ -280,7 +278,10 @@ void checksumBlocks() throws IOException { blockIdx < getLocatedBlocks().size() && getRemaining() >= 0; blockIdx++) { if (isRefetchBlocks()) { // refetch to get fresh tokens - refetchBlocks(); + setBlockLocations(getClient().getBlockLocations(getSrc(), + getLength())); + setLocatedBlocks(getBlockLocations().getLocatedBlocks()); + setRefetchBlocks(false); } LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); @@ -379,13 +380,15 @@ private void tryDatanode(LocatedBlock locatedBlock, } //read md5 - final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); md5.write(getMd5out()); // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); + ct = PBHelperClient.convert(checksumData + .getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + "inferring checksum by reading first byte"); @@ -410,160 +413,4 @@ && getCrcType() != ct) { } } } - - /** - * Striped file checksum computing. - */ - static class StripedFileNonStripedChecksumComputer - extends FileChecksumComputer { - private final ErasureCodingPolicy ecPolicy; - private int bgIdx; - - StripedFileNonStripedChecksumComputer(String src, long length, - LocatedBlocks blockLocations, - ClientProtocol namenode, - DFSClient client, - ErasureCodingPolicy ecPolicy) - throws IOException { - super(src, length, blockLocations, namenode, client); - - this.ecPolicy = ecPolicy; - } - - @Override - void checksumBlocks() throws IOException { - int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout(); - setTimeout(tmpTimeout); - - for (bgIdx = 0; - bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) { - if (isRefetchBlocks()) { // refetch to get fresh tokens - refetchBlocks(); - } - - LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx); - LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; - - if (!checksumBlockGroup(blockGroup)) { - throw new IOException("Fail to get block MD5 for " + locatedBlock); - } - } - } - - - private boolean checksumBlockGroup( - LocatedStripedBlock blockGroup) throws IOException { - ExtendedBlock block = blockGroup.getBlock(); - if (getRemaining() < block.getNumBytes()) { - block.setNumBytes(getRemaining()); - } - setRemaining(getRemaining() - block.getNumBytes()); - - StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, - blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy); - DatanodeInfo[] datanodes = blockGroup.getLocations(); - - //try each datanode in the block group. - boolean done = false; - for (int j = 0; !done && j < datanodes.length; j++) { - try { - tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]); - done = true; - } catch (InvalidBlockTokenException ibte) { - if (bgIdx > getLastRetriedIndex()) { - LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " - + "for file {} for block {} from datanode {}. Will retry " - + "the block once.", - getSrc(), block, datanodes[j]); - setLastRetriedIndex(bgIdx); - done = true; // actually it's not done; but we'll retry - bgIdx--; // repeat at bgIdx-th block - setRefetchBlocks(true); - } - } catch (IOException ie) { - LOG.warn("src={}" + ", datanodes[{}]={}", - getSrc(), j, datanodes[j], ie); - } - } - - return done; - } - - /** - * Return true when sounds good to continue or retry, false when severe - * condition or totally failed. - */ - private void tryDatanode(LocatedStripedBlock blockGroup, - StripedBlockInfo stripedBlockInfo, - DatanodeInfo datanode) throws IOException { - - try (IOStreamPair pair = getClient().connectToDN(datanode, - getTimeout(), blockGroup.getBlockToken())) { - - LOG.debug("write to {}: {}, blockGroup={}", - datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); - - // get block MD5 - createSender(pair).blockGroupChecksum(stripedBlockInfo, - blockGroup.getBlockToken()); - - BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(pair.in)); - - String logInfo = "for blockGroup " + blockGroup + - " from datanode " + datanode; - DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - - OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (bgIdx == 0) { //first block - setBytesPerCRC(bpc); - } else { - if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(getMd5out()); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = getClient().inferChecksumTypeByReading(blockGroup, datanode); - } - - if (bgIdx == 0) { - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED && - getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (bgIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + datanode + ": md5=" + md5); - } - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index ad3f2ad90a..4aa545b333 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; @@ -198,17 +197,6 @@ void copyBlock(final ExtendedBlock blk, * @param blockToken security token for accessing the block. * @throws IOException */ - void blockChecksum(ExtendedBlock blk, - Token blockToken) throws IOException; - - - /** - * Get striped block group checksum (MD5 of CRC32). - * - * @param stripedBlockInfo a striped block info. - * @param blockToken security token for accessing the block. - * @throws IOException - */ - void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token blockToken) throws IOException; + void blockChecksum(final ExtendedBlock blk, + final Token blockToken) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 94250e5e7f..511574c414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -38,7 +38,6 @@ public enum Op { REQUEST_SHORT_CIRCUIT_FDS((byte)87), RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), - BLOCK_GROUP_CHECKSUM((byte)90), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 585ed99b1e..65456815ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -28,13 +28,11 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -263,21 +261,4 @@ public void blockChecksum(final ExtendedBlock blk, send(out, Op.BLOCK_CHECKSUM, proto); } - - @Override - public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token blockToken) throws IOException { - OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader( - stripedBlockInfo.getBlock(), blockToken)) - .setDatanodes(PBHelperClient.convertToProto( - stripedBlockInfo.getDatanodes())) - .addAllBlockTokens(PBHelperClient.convert( - stripedBlockInfo.getBlockTokens())) - .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( - stripedBlockInfo.getErasureCodingPolicy())) - .build(); - - send(out, Op.BLOCK_GROUP_CHECKSUM, proto); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 4759373212..38e875c012 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -553,8 +553,10 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); List tokenProtos = proto.getBlockTokensList(); - Token[] blockTokens = - convertTokens(tokenProtos); + Token[] blockTokens = new Token[indices.length]; + for (int i = 0; i < indices.length; i++) { + blockTokens[i] = convert(tokenProtos.get(i)); + } ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); } lb.setBlockToken(convert(proto.getBlockToken())); @@ -562,18 +564,6 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { return lb; } - static public Token[] convertTokens( - List tokenProtos) { - - @SuppressWarnings("unchecked") - Token[] blockTokens = new Token[tokenProtos.size()]; - for (int i = 0; i < blockTokens.length; i++) { - blockTokens[i] = convert(tokenProtos.get(i)); - } - - return blockTokens; - } - static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -825,7 +815,9 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { byte[] indices = sb.getBlockIndices(); builder.setBlockIndices(PBHelperClient.getByteString(indices)); Token[] blockTokens = sb.getBlockTokens(); - builder.addAllBlockTokens(convert(blockTokens)); + for (int i = 0; i < indices.length; i++) { + builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); + } } return builder.setB(PBHelperClient.convert(b.getBlock())) @@ -833,16 +825,6 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } - public static List convert( - Token[] blockTokens) { - List results = new ArrayList<>(blockTokens.length); - for (Token bt : blockTokens) { - results.add(convert(bt)); - } - - return results; - } - public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { List cList = proto.getCreationPolicy() .getStorageTypesList(); @@ -2518,14 +2500,4 @@ public static ErasureCodingPolicyProto convertErasureCodingPolicy( .setId(policy.getId()); return builder.build(); } - - public static HdfsProtos.DatanodeInfosProto convertToProto( - DatanodeInfo[] datanodeInfos) { - HdfsProtos.DatanodeInfosProto.Builder builder = - HdfsProtos.DatanodeInfosProto.newBuilder(); - for (DatanodeInfo datanodeInfo : datanodeInfos) { - builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); - } - return builder.build(); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 0819376aac..43772e2efb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -75,18 +75,6 @@ public class StripedBlockUtil { public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); - /** - * Parses a striped block group into individual blocks. - * @param bg The striped block group - * @param ecPolicy The erasure coding policy - * @return An array of the blocks in the group - */ - public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, - ErasureCodingPolicy ecPolicy) { - return parseStripedBlockGroup(bg, ecPolicy.getCellSize(), - ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); - } - /** * This method parses a striped block group into individual blocks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 522ee06b68..a091d417d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -74,6 +74,7 @@ message OpReadBlockProto { optional CachingStrategyProto cachingStrategy = 5; } + message ChecksumProto { required ChecksumTypeProto type = 1; required uint32 bytesPerChecksum = 2; @@ -148,14 +149,6 @@ message OpBlockChecksumProto { required BaseHeaderProto header = 1; } -message OpBlockGroupChecksumProto { - required BaseHeaderProto header = 1; - required DatanodeInfosProto datanodes = 2; - // each internal block has a block token - repeated hadoop.common.TokenProto blockTokens = 3; - required ErasureCodingPolicyProto ecPolicy = 4; -} - /** * An ID uniquely identifying a shared memory segment. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index b2f26f8d3c..e0401574ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -26,13 +26,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -113,9 +111,6 @@ protected final void processOp(Op op) throws IOException { case BLOCK_CHECKSUM: opBlockChecksum(in); break; - case BLOCK_GROUP_CHECKSUM: - opStripedBlockChecksum(in); - break; case TRANSFER_BLOCK: opTransferBlock(in); break; @@ -295,27 +290,4 @@ private void opBlockChecksum(DataInputStream in) throws IOException { if (traceScope != null) traceScope.close(); } } - - /** Receive OP_STRIPED_BLOCK_CHECKSUM. */ - private void opStripedBlockChecksum(DataInputStream dis) throws IOException { - OpBlockGroupChecksumProto proto = - OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis)); - TraceScope traceScope = continueTraceSpan(proto.getHeader(), - proto.getClass().getSimpleName()); - StripedBlockInfo stripedBlockInfo = new StripedBlockInfo( - PBHelperClient.convert(proto.getHeader().getBlock()), - PBHelperClient.convert(proto.getDatanodes()), - PBHelperClient.convertTokens(proto.getBlockTokensList()), - PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy()) - ); - - try { - blockGroupChecksum(stripedBlockInfo, - PBHelperClient.convert(proto.getHeader().getToken())); - } finally { - if (traceScope != null) { - traceScope.close(); - } - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index 1f1a25c418..9a5552db3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -19,30 +19,16 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Op; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; @@ -55,87 +41,13 @@ final class BlockChecksumHelper { static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); - private BlockChecksumHelper() { - } + private BlockChecksumHelper() {} /** * The abstract base block checksum computer. */ - static abstract class AbstractBlockChecksumComputer { + static abstract class BlockChecksumComputer { private final DataNode datanode; - - private byte[] outBytes; - private int bytesPerCRC = -1; - private DataChecksum.Type crcType = null; - private long crcPerBlock = -1; - private int checksumSize = -1; - - AbstractBlockChecksumComputer(DataNode datanode) throws IOException { - this.datanode = datanode; - } - - abstract void compute() throws IOException; - - Sender createSender(IOStreamPair pair) { - DataOutputStream out = (DataOutputStream) pair.out; - return new Sender(out); - } - - DataNode getDatanode() { - return datanode; - } - - InputStream getBlockInputStream(ExtendedBlock block, long seekOffset) - throws IOException { - return datanode.data.getBlockInputStream(block, seekOffset); - } - - void setOutBytes(byte[] bytes) { - this.outBytes = bytes; - } - - byte[] getOutBytes() { - return outBytes; - } - - int getBytesPerCRC() { - return bytesPerCRC; - } - - public void setBytesPerCRC(int bytesPerCRC) { - this.bytesPerCRC = bytesPerCRC; - } - - public void setCrcType(DataChecksum.Type crcType) { - this.crcType = crcType; - } - - public void setCrcPerBlock(long crcPerBlock) { - this.crcPerBlock = crcPerBlock; - } - - public void setChecksumSize(int checksumSize) { - this.checksumSize = checksumSize; - } - - DataChecksum.Type getCrcType() { - return crcType; - } - - long getCrcPerBlock() { - return crcPerBlock; - } - - int getChecksumSize() { - return checksumSize; - } - } - - /** - * The abstract base block checksum computer. - */ - static abstract class BlockChecksumComputer - extends AbstractBlockChecksumComputer { private final ExtendedBlock block; // client side now can specify a range of the block for checksum private final long requestLength; @@ -144,12 +56,17 @@ static abstract class BlockChecksumComputer private final long visibleLength; private final boolean partialBlk; + private byte[] outBytes; + private int bytesPerCRC = -1; + private DataChecksum.Type crcType = null; + private long crcPerBlock = -1; + private int checksumSize = -1; private BlockMetadataHeader header; private DataChecksum checksum; BlockChecksumComputer(DataNode datanode, ExtendedBlock block) throws IOException { - super(datanode); + this.datanode = datanode; this.block = block; this.requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); @@ -164,80 +81,98 @@ static abstract class BlockChecksumComputer new BufferedInputStream(metadataIn, ioFileBufferSize)); } - Sender createSender(IOStreamPair pair) { - DataOutputStream out = (DataOutputStream) pair.out; - return new Sender(out); + protected DataNode getDatanode() { + return datanode; } - - ExtendedBlock getBlock() { + protected ExtendedBlock getBlock() { return block; } - long getRequestLength() { + protected long getRequestLength() { return requestLength; } - LengthInputStream getMetadataIn() { + protected LengthInputStream getMetadataIn() { return metadataIn; } - DataInputStream getChecksumIn() { + protected DataInputStream getChecksumIn() { return checksumIn; } - long getVisibleLength() { + protected long getVisibleLength() { return visibleLength; } - boolean isPartialBlk() { + protected boolean isPartialBlk() { return partialBlk; } - BlockMetadataHeader getHeader() { + protected void setOutBytes(byte[] bytes) { + this.outBytes = bytes; + } + + protected byte[] getOutBytes() { + return outBytes; + } + + protected int getBytesPerCRC() { + return bytesPerCRC; + } + + protected DataChecksum.Type getCrcType() { + return crcType; + } + + protected long getCrcPerBlock() { + return crcPerBlock; + } + + protected int getChecksumSize() { + return checksumSize; + } + + protected BlockMetadataHeader getHeader() { return header; } - DataChecksum getChecksum() { + protected DataChecksum getChecksum() { return checksum; } /** * Perform the block checksum computing. - * * @throws IOException */ abstract void compute() throws IOException; /** * Read block metadata header. - * * @throws IOException */ - void readHeader() throws IOException { + protected void readHeader() throws IOException { //read metadata file header = BlockMetadataHeader.readHeader(checksumIn); checksum = header.getChecksum(); - setChecksumSize(checksum.getChecksumSize()); - setBytesPerCRC(checksum.getBytesPerChecksum()); - long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 : + checksumSize = checksum.getChecksumSize(); + bytesPerCRC = checksum.getBytesPerChecksum(); + crcPerBlock = checksumSize <= 0 ? 0 : (metadataIn.getLength() - - BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize(); - setCrcPerBlock(crcPerBlock); - setCrcType(checksum.getChecksumType()); + BlockMetadataHeader.getHeaderSize()) / checksumSize; + crcType = checksum.getChecksumType(); } /** * Calculate partial block checksum. - * * @return * @throws IOException */ - byte[] crcPartialBlock() throws IOException { - int partialLength = (int) (requestLength % getBytesPerCRC()); + protected byte[] crcPartialBlock() throws IOException { + int partialLength = (int) (requestLength % bytesPerCRC); if (partialLength > 0) { byte[] buf = new byte[partialLength]; - final InputStream blockIn = getBlockInputStream(block, + final InputStream blockIn = datanode.data.getBlockInputStream(block, requestLength - partialLength); try { // Get the CRC of the partialLength. @@ -246,7 +181,7 @@ byte[] crcPartialBlock() throws IOException { IOUtils.closeStream(blockIn); } checksum.update(buf, 0, partialLength); - byte[] partialCrc = new byte[getChecksumSize()]; + byte[] partialCrc = new byte[checksumSize]; checksum.writeValue(partialCrc, 0, true); return partialCrc; } @@ -294,7 +229,7 @@ private MD5Hash checksumWholeBlock() throws IOException { } private MD5Hash checksumPartialBlock() throws IOException { - byte[] buffer = new byte[4 * 1024]; + byte[] buffer = new byte[4*1024]; MessageDigest digester = MD5Hash.getDigester(); long remaining = (getRequestLength() / getBytesPerCRC()) @@ -316,115 +251,4 @@ private MD5Hash checksumPartialBlock() throws IOException { return new MD5Hash(digester.digest()); } } - - /** - * Non-striped block group checksum computer for striped blocks. - */ - static class BlockGroupNonStripedChecksumComputer - extends AbstractBlockChecksumComputer { - - private final ExtendedBlock blockGroup; - private final ErasureCodingPolicy ecPolicy; - private final DatanodeInfo[] datanodes; - private final Token[] blockTokens; - - private final DataOutputBuffer md5writer = new DataOutputBuffer(); - - BlockGroupNonStripedChecksumComputer(DataNode datanode, - StripedBlockInfo stripedBlockInfo) - throws IOException { - super(datanode); - this.blockGroup = stripedBlockInfo.getBlock(); - this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); - this.datanodes = stripedBlockInfo.getDatanodes(); - this.blockTokens = stripedBlockInfo.getBlockTokens(); - } - - @Override - void compute() throws IOException { - for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) { - ExtendedBlock block = - StripedBlockUtil.constructInternalBlock(blockGroup, - ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx); - DatanodeInfo targetDatanode = datanodes[idx]; - Token blockToken = blockTokens[idx]; - checksumBlock(block, idx, blockToken, targetDatanode); - } - - MD5Hash md5out = MD5Hash.digest(md5writer.getData()); - setOutBytes(md5out.getDigest()); - } - - private void checksumBlock(ExtendedBlock block, int blockIdx, - Token blockToken, - DatanodeInfo targetDatanode) throws IOException { - int timeout = 3000; - try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode, - timeout, block, blockToken)) { - - LOG.debug("write to {}: {}, block={}", - getDatanode(), Op.BLOCK_CHECKSUM, block); - - // get block MD5 - createSender(pair).blockChecksum(block, blockToken); - - final DataTransferProtos.BlockOpResponseProto reply = - DataTransferProtos.BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(pair.in)); - - String logInfo = "for block " + block - + " from datanode " + targetDatanode; - DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - - DataTransferProtos.OpBlockChecksumResponseProto checksumData = - reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (blockIdx == 0) { //first block - setBytesPerCRC(bpc); - } else if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (blockIdx == 0) { - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(md5writer); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = DataChecksum.Type.DEFAULT; - } - - if (blockIdx == 0) { // first block - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED && - getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (blockIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); - } - } - } - } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 63bf5ae536..1d4a79ac88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; @@ -47,9 +46,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer; -import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer; -import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; @@ -926,46 +923,6 @@ public void blockChecksum(ExtendedBlock block, datanode.metrics.addBlockChecksumOp(elapsed()); } - @Override - public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, - final Token blockToken) - throws IOException { - updateCurrentThreadName("Getting checksum for block group" + - stripedBlockInfo.getBlock()); - final DataOutputStream out = new DataOutputStream(getOutputStream()); - checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken, - Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); - - AbstractBlockChecksumComputer maker = - new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo); - - try { - maker.compute(); - - //write reply - BlockOpResponseProto.newBuilder() - .setStatus(SUCCESS) - .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() - .setBytesPerCrc(maker.getBytesPerCRC()) - .setCrcPerBlock(maker.getCrcPerBlock()) - .setMd5(ByteString.copyFrom(maker.getOutBytes())) - .setCrcType(PBHelperClient.convert(maker.getCrcType()))) - .build() - .writeDelimitedTo(out); - out.flush(); - } catch (IOException ioe) { - LOG.info("blockChecksum " + stripedBlockInfo.getBlock() + - " received exception " + ioe); - incrDatanodeNetworkErrors(); - throw ioe; - } finally { - IOUtils.closeStream(out); - } - - //update metrics - datanode.metrics.addBlockChecksumOp(elapsed()); - } - @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException {