HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng

This commit is contained in:
Uma Maheswara Rao G 2016-03-26 19:58:09 -07:00
parent a337ceb74e
commit 3a4ff7776e
14 changed files with 878 additions and 83 deletions

View File

@ -8,6 +8,7 @@
<Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
<Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
<Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
<Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>
<Class name="org.apache.hadoop.hdfs.protocol.DirectoryListing"/>
<Class name="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier"/>
<Class name="org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey"/>

View File

@ -1704,7 +1704,10 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException {
/**
* Get the checksum of the whole file or a range of the file. Note that the
* range always starts from the beginning of the file.
* range always starts from the beginning of the file. The file can be
* in replicated form, or striped mode. It can be used to checksum and compare
* two replicated files, or two striped files, but not applicable for two
* files of different block layout forms.
* @param src The file path
* @param length the length of the range, i.e., the range is [0, length]
* @return The checksum
@ -1717,7 +1720,11 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
LocatedBlocks blockLocations = getBlockLocations(src, length);
FileChecksumHelper.FileChecksumComputer maker =
FileChecksumHelper.FileChecksumComputer maker;
ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy();
maker = ecPolicy != null ?
new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
length, blockLocations, namenode, this, ecPolicy) :
new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
blockLocations, namenode, this);

View File

@ -22,10 +22,13 @@
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@ -75,7 +78,7 @@ static abstract class FileChecksumComputer {
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
private long crcPerBlock = 0;
private boolean refetchBlocks = false;
private boolean isRefetchBlocks = false;
private int lastRetriedIndex = -1;
/**
@ -127,8 +130,11 @@ LocatedBlocks getBlockLocations() {
return blockLocations;
}
void setBlockLocations(LocatedBlocks blockLocations) {
this.blockLocations = blockLocations;
void refetchBlocks() throws IOException {
this.blockLocations = getClient().getBlockLocations(getSrc(),
getLength());
this.locatedBlocks = getBlockLocations().getLocatedBlocks();
this.isRefetchBlocks = false;
}
int getTimeout() {
@ -143,10 +149,6 @@ List<LocatedBlock> getLocatedBlocks() {
return locatedBlocks;
}
void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
this.locatedBlocks = locatedBlocks;
}
long getRemaining() {
return remaining;
}
@ -180,11 +182,11 @@ void setCrcPerBlock(long crcPerBlock) {
}
boolean isRefetchBlocks() {
return refetchBlocks;
return isRefetchBlocks;
}
void setRefetchBlocks(boolean refetchBlocks) {
this.refetchBlocks = refetchBlocks;
this.isRefetchBlocks = refetchBlocks;
}
int getLastRetriedIndex() {
@ -278,10 +280,7 @@ void checksumBlocks() throws IOException {
blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
blockIdx++) {
if (isRefetchBlocks()) { // refetch to get fresh tokens
setBlockLocations(getClient().getBlockLocations(getSrc(),
getLength()));
setLocatedBlocks(getBlockLocations().getLocatedBlocks());
setRefetchBlocks(false);
refetchBlocks();
}
LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
@ -380,15 +379,13 @@ private void tryDatanode(LocatedBlock locatedBlock,
}
//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
md5.write(getMd5out());
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData
.getCrcType());
ct = PBHelperClient.convert(checksumData.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
@ -413,4 +410,160 @@ && getCrcType() != ct) {
}
}
}
/**
* Striped file checksum computing.
*/
static class StripedFileNonStripedChecksumComputer
extends FileChecksumComputer {
private final ErasureCodingPolicy ecPolicy;
private int bgIdx;
StripedFileNonStripedChecksumComputer(String src, long length,
LocatedBlocks blockLocations,
ClientProtocol namenode,
DFSClient client,
ErasureCodingPolicy ecPolicy)
throws IOException {
super(src, length, blockLocations, namenode, client);
this.ecPolicy = ecPolicy;
}
@Override
void checksumBlocks() throws IOException {
int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
setTimeout(tmpTimeout);
for (bgIdx = 0;
bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) {
if (isRefetchBlocks()) { // refetch to get fresh tokens
refetchBlocks();
}
LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
if (!checksumBlockGroup(blockGroup)) {
throw new IOException("Fail to get block MD5 for " + locatedBlock);
}
}
}
private boolean checksumBlockGroup(
LocatedStripedBlock blockGroup) throws IOException {
ExtendedBlock block = blockGroup.getBlock();
if (getRemaining() < block.getNumBytes()) {
block.setNumBytes(getRemaining());
}
setRemaining(getRemaining() - block.getNumBytes());
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
DatanodeInfo[] datanodes = blockGroup.getLocations();
//try each datanode in the block group.
boolean done = false;
for (int j = 0; !done && j < datanodes.length; j++) {
try {
tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
done = true;
} catch (InvalidBlockTokenException ibte) {
if (bgIdx > getLastRetriedIndex()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file {} for block {} from datanode {}. Will retry "
+ "the block once.",
getSrc(), block, datanodes[j]);
setLastRetriedIndex(bgIdx);
done = true; // actually it's not done; but we'll retry
bgIdx--; // repeat at bgIdx-th block
setRefetchBlocks(true);
}
} catch (IOException ie) {
LOG.warn("src={}" + ", datanodes[{}]={}",
getSrc(), j, datanodes[j], ie);
}
}
return done;
}
/**
* Return true when sounds good to continue or retry, false when severe
* condition or totally failed.
*/
private void tryDatanode(LocatedStripedBlock blockGroup,
StripedBlockInfo stripedBlockInfo,
DatanodeInfo datanode) throws IOException {
try (IOStreamPair pair = getClient().connectToDN(datanode,
getTimeout(), blockGroup.getBlockToken())) {
LOG.debug("write to {}: {}, blockGroup={}",
datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
// get block MD5
createSender(pair).blockGroupChecksum(stripedBlockInfo,
blockGroup.getBlockToken());
BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(pair.in));
String logInfo = "for blockGroup " + blockGroup +
" from datanode " + datanode;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
//read byte-per-checksum
final int bpc = checksumData.getBytesPerCrc();
if (bgIdx == 0) { //first block
setBytesPerCRC(bpc);
} else {
if (bpc != getBytesPerCRC()) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + getBytesPerCRC());
}
}
//read crc-per-block
final long cpb = checksumData.getCrcPerBlock();
if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
setCrcPerBlock(cpb);
}
//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
md5.write(getMd5out());
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
}
if (bgIdx == 0) {
setCrcType(ct);
} else if (getCrcType() != DataChecksum.Type.MIXED &&
getCrcType() != ct) {
// if crc types are mixed in a file
setCrcType(DataChecksum.Type.MIXED);
}
if (LOG.isDebugEnabled()) {
if (bgIdx == 0) {
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock());
}
LOG.debug("got reply from " + datanode + ": md5=" + md5);
}
}
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
/**
* Striped block info that can be sent elsewhere to do block group level things,
* like checksum, and etc.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class StripedBlockInfo {
private final ExtendedBlock block;
private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens;
private final ErasureCodingPolicy ecPolicy;
public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes,
Token<BlockTokenIdentifier>[] blockTokens,
ErasureCodingPolicy ecPolicy) {
this.block = block;
this.datanodes = datanodes;
this.blockTokens = blockTokens;
this.ecPolicy = ecPolicy;
}
public ExtendedBlock getBlock() {
return block;
}
public DatanodeInfo[] getDatanodes() {
return datanodes;
}
public Token<BlockTokenIdentifier>[] getBlockTokens() {
return blockTokens;
}
public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@ -197,6 +198,17 @@ void copyBlock(final ExtendedBlock blk,
* @param blockToken security token for accessing the block.
* @throws IOException
*/
void blockChecksum(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken) throws IOException;
void blockChecksum(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/**
* Get striped block group checksum (MD5 of CRC32).
*
* @param stripedBlockInfo a striped block info.
* @param blockToken security token for accessing the block.
* @throws IOException
*/
void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
Token<BlockTokenIdentifier> blockToken) throws IOException;
}

View File

@ -38,6 +38,7 @@ public enum Op {
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
CUSTOM((byte)127);
/** The code for this operation. */

View File

@ -28,11 +28,13 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@ -261,4 +263,21 @@ public void blockChecksum(final ExtendedBlock blk,
send(out, Op.BLOCK_CHECKSUM, proto);
}
@Override
public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
Token<BlockTokenIdentifier> blockToken) throws IOException {
OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(
stripedBlockInfo.getBlock(), blockToken))
.setDatanodes(PBHelperClient.convertToProto(
stripedBlockInfo.getDatanodes()))
.addAllBlockTokens(PBHelperClient.convert(
stripedBlockInfo.getBlockTokens()))
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
stripedBlockInfo.getErasureCodingPolicy()))
.build();
send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
}
}

View File

@ -553,10 +553,8 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
List<TokenProto> tokenProtos = proto.getBlockTokensList();
Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
for (int i = 0; i < indices.length; i++) {
blockTokens[i] = convert(tokenProtos.get(i));
}
Token<BlockTokenIdentifier>[] blockTokens =
convertTokens(tokenProtos);
((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
}
lb.setBlockToken(convert(proto.getBlockToken()));
@ -564,6 +562,18 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
return lb;
}
static public Token<BlockTokenIdentifier>[] convertTokens(
List<TokenProto> tokenProtos) {
@SuppressWarnings("unchecked")
Token<BlockTokenIdentifier>[] blockTokens = new Token[tokenProtos.size()];
for (int i = 0; i < blockTokens.length; i++) {
blockTokens[i] = convert(tokenProtos.get(i));
}
return blockTokens;
}
static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
@ -815,9 +825,7 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
byte[] indices = sb.getBlockIndices();
builder.setBlockIndices(PBHelperClient.getByteString(indices));
Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
for (int i = 0; i < indices.length; i++) {
builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
}
builder.addAllBlockTokens(convert(blockTokens));
}
return builder.setB(PBHelperClient.convert(b.getBlock()))
@ -825,6 +833,16 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
public static List<TokenProto> convert(
Token<BlockTokenIdentifier>[] blockTokens) {
List<TokenProto> results = new ArrayList<>(blockTokens.length);
for (Token<BlockTokenIdentifier> bt : blockTokens) {
results.add(convert(bt));
}
return results;
}
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
List<StorageTypeProto> cList = proto.getCreationPolicy()
.getStorageTypesList();
@ -2500,4 +2518,14 @@ public static ErasureCodingPolicyProto convertErasureCodingPolicy(
.setId(policy.getId());
return builder.build();
}
public static HdfsProtos.DatanodeInfosProto convertToProto(
DatanodeInfo[] datanodeInfos) {
HdfsProtos.DatanodeInfosProto.Builder builder =
HdfsProtos.DatanodeInfosProto.newBuilder();
for (DatanodeInfo datanodeInfo : datanodeInfos) {
builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
}
return builder.build();
}
}

View File

@ -75,6 +75,18 @@ public class StripedBlockUtil {
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
/**
* Parses a striped block group into individual blocks.
* @param bg The striped block group
* @param ecPolicy The erasure coding policy
* @return An array of the blocks in the group
*/
public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
ErasureCodingPolicy ecPolicy) {
return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
}
/**
* This method parses a striped block group into individual blocks.
*

View File

@ -74,7 +74,6 @@ message OpReadBlockProto {
optional CachingStrategyProto cachingStrategy = 5;
}
message ChecksumProto {
required ChecksumTypeProto type = 1;
required uint32 bytesPerChecksum = 2;
@ -149,6 +148,14 @@ message OpBlockChecksumProto {
required BaseHeaderProto header = 1;
}
message OpBlockGroupChecksumProto {
required BaseHeaderProto header = 1;
required DatanodeInfosProto datanodes = 2;
// each internal block has a block token
repeated hadoop.common.TokenProto blockTokens = 3;
required ErasureCodingPolicyProto ecPolicy = 4;
}
/**
* An ID uniquely identifying a shared memory segment.
*/

View File

@ -26,11 +26,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
@ -111,6 +113,9 @@ protected final void processOp(Op op) throws IOException {
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case BLOCK_GROUP_CHECKSUM:
opStripedBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
@ -290,4 +295,27 @@ private void opBlockChecksum(DataInputStream in) throws IOException {
if (traceScope != null) traceScope.close();
}
}
/** Receive OP_STRIPED_BLOCK_CHECKSUM. */
private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
OpBlockGroupChecksumProto proto =
OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(
PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getDatanodes()),
PBHelperClient.convertTokens(proto.getBlockTokensList()),
PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
);
try {
blockGroupChecksum(stripedBlockInfo,
PBHelperClient.convert(proto.getHeader().getToken()));
} finally {
if (traceScope != null) {
traceScope.close();
}
}
}
}

View File

@ -19,16 +19,30 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
@ -41,13 +55,87 @@ final class BlockChecksumHelper {
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
private BlockChecksumHelper() {}
private BlockChecksumHelper() {
}
/**
* The abstract base block checksum computer.
*/
static abstract class BlockChecksumComputer {
static abstract class AbstractBlockChecksumComputer {
private final DataNode datanode;
private byte[] outBytes;
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = null;
private long crcPerBlock = -1;
private int checksumSize = -1;
AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
this.datanode = datanode;
}
abstract void compute() throws IOException;
Sender createSender(IOStreamPair pair) {
DataOutputStream out = (DataOutputStream) pair.out;
return new Sender(out);
}
DataNode getDatanode() {
return datanode;
}
InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
throws IOException {
return datanode.data.getBlockInputStream(block, seekOffset);
}
void setOutBytes(byte[] bytes) {
this.outBytes = bytes;
}
byte[] getOutBytes() {
return outBytes;
}
int getBytesPerCRC() {
return bytesPerCRC;
}
public void setBytesPerCRC(int bytesPerCRC) {
this.bytesPerCRC = bytesPerCRC;
}
public void setCrcType(DataChecksum.Type crcType) {
this.crcType = crcType;
}
public void setCrcPerBlock(long crcPerBlock) {
this.crcPerBlock = crcPerBlock;
}
public void setChecksumSize(int checksumSize) {
this.checksumSize = checksumSize;
}
DataChecksum.Type getCrcType() {
return crcType;
}
long getCrcPerBlock() {
return crcPerBlock;
}
int getChecksumSize() {
return checksumSize;
}
}
/**
* The abstract base block checksum computer.
*/
static abstract class BlockChecksumComputer
extends AbstractBlockChecksumComputer {
private final ExtendedBlock block;
// client side now can specify a range of the block for checksum
private final long requestLength;
@ -56,17 +144,12 @@ static abstract class BlockChecksumComputer {
private final long visibleLength;
private final boolean partialBlk;
private byte[] outBytes;
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = null;
private long crcPerBlock = -1;
private int checksumSize = -1;
private BlockMetadataHeader header;
private DataChecksum checksum;
BlockChecksumComputer(DataNode datanode,
ExtendedBlock block) throws IOException {
this.datanode = datanode;
super(datanode);
this.block = block;
this.requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
@ -81,98 +164,80 @@ static abstract class BlockChecksumComputer {
new BufferedInputStream(metadataIn, ioFileBufferSize));
}
protected DataNode getDatanode() {
return datanode;
Sender createSender(IOStreamPair pair) {
DataOutputStream out = (DataOutputStream) pair.out;
return new Sender(out);
}
protected ExtendedBlock getBlock() {
ExtendedBlock getBlock() {
return block;
}
protected long getRequestLength() {
long getRequestLength() {
return requestLength;
}
protected LengthInputStream getMetadataIn() {
LengthInputStream getMetadataIn() {
return metadataIn;
}
protected DataInputStream getChecksumIn() {
DataInputStream getChecksumIn() {
return checksumIn;
}
protected long getVisibleLength() {
long getVisibleLength() {
return visibleLength;
}
protected boolean isPartialBlk() {
boolean isPartialBlk() {
return partialBlk;
}
protected void setOutBytes(byte[] bytes) {
this.outBytes = bytes;
}
protected byte[] getOutBytes() {
return outBytes;
}
protected int getBytesPerCRC() {
return bytesPerCRC;
}
protected DataChecksum.Type getCrcType() {
return crcType;
}
protected long getCrcPerBlock() {
return crcPerBlock;
}
protected int getChecksumSize() {
return checksumSize;
}
protected BlockMetadataHeader getHeader() {
BlockMetadataHeader getHeader() {
return header;
}
protected DataChecksum getChecksum() {
DataChecksum getChecksum() {
return checksum;
}
/**
* Perform the block checksum computing.
*
* @throws IOException
*/
abstract void compute() throws IOException;
/**
* Read block metadata header.
*
* @throws IOException
*/
protected void readHeader() throws IOException {
void readHeader() throws IOException {
//read metadata file
header = BlockMetadataHeader.readHeader(checksumIn);
checksum = header.getChecksum();
checksumSize = checksum.getChecksumSize();
bytesPerCRC = checksum.getBytesPerChecksum();
crcPerBlock = checksumSize <= 0 ? 0 :
setChecksumSize(checksum.getChecksumSize());
setBytesPerCRC(checksum.getBytesPerChecksum());
long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 :
(metadataIn.getLength() -
BlockMetadataHeader.getHeaderSize()) / checksumSize;
crcType = checksum.getChecksumType();
BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize();
setCrcPerBlock(crcPerBlock);
setCrcType(checksum.getChecksumType());
}
/**
* Calculate partial block checksum.
*
* @return
* @throws IOException
*/
protected byte[] crcPartialBlock() throws IOException {
int partialLength = (int) (requestLength % bytesPerCRC);
byte[] crcPartialBlock() throws IOException {
int partialLength = (int) (requestLength % getBytesPerCRC());
if (partialLength > 0) {
byte[] buf = new byte[partialLength];
final InputStream blockIn = datanode.data.getBlockInputStream(block,
final InputStream blockIn = getBlockInputStream(block,
requestLength - partialLength);
try {
// Get the CRC of the partialLength.
@ -181,7 +246,7 @@ protected byte[] crcPartialBlock() throws IOException {
IOUtils.closeStream(blockIn);
}
checksum.update(buf, 0, partialLength);
byte[] partialCrc = new byte[checksumSize];
byte[] partialCrc = new byte[getChecksumSize()];
checksum.writeValue(partialCrc, 0, true);
return partialCrc;
}
@ -229,7 +294,7 @@ private MD5Hash checksumWholeBlock() throws IOException {
}
private MD5Hash checksumPartialBlock() throws IOException {
byte[] buffer = new byte[4*1024];
byte[] buffer = new byte[4 * 1024];
MessageDigest digester = MD5Hash.getDigester();
long remaining = (getRequestLength() / getBytesPerCRC())
@ -251,4 +316,115 @@ private MD5Hash checksumPartialBlock() throws IOException {
return new MD5Hash(digester.digest());
}
}
}
/**
* Non-striped block group checksum computer for striped blocks.
*/
static class BlockGroupNonStripedChecksumComputer
extends AbstractBlockChecksumComputer {
private final ExtendedBlock blockGroup;
private final ErasureCodingPolicy ecPolicy;
private final DatanodeInfo[] datanodes;
private final Token<BlockTokenIdentifier>[] blockTokens;
private final DataOutputBuffer md5writer = new DataOutputBuffer();
BlockGroupNonStripedChecksumComputer(DataNode datanode,
StripedBlockInfo stripedBlockInfo)
throws IOException {
super(datanode);
this.blockGroup = stripedBlockInfo.getBlock();
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
this.datanodes = stripedBlockInfo.getDatanodes();
this.blockTokens = stripedBlockInfo.getBlockTokens();
}
@Override
void compute() throws IOException {
for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
ExtendedBlock block =
StripedBlockUtil.constructInternalBlock(blockGroup,
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
DatanodeInfo targetDatanode = datanodes[idx];
Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
checksumBlock(block, idx, blockToken, targetDatanode);
}
MD5Hash md5out = MD5Hash.digest(md5writer.getData());
setOutBytes(md5out.getDigest());
}
private void checksumBlock(ExtendedBlock block, int blockIdx,
Token<BlockTokenIdentifier> blockToken,
DatanodeInfo targetDatanode) throws IOException {
int timeout = 3000;
try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
timeout, block, blockToken)) {
LOG.debug("write to {}: {}, block={}",
getDatanode(), Op.BLOCK_CHECKSUM, block);
// get block MD5
createSender(pair).blockChecksum(block, blockToken);
final DataTransferProtos.BlockOpResponseProto reply =
DataTransferProtos.BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(pair.in));
String logInfo = "for block " + block
+ " from datanode " + targetDatanode;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
DataTransferProtos.OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse();
//read byte-per-checksum
final int bpc = checksumData.getBytesPerCrc();
if (blockIdx == 0) { //first block
setBytesPerCRC(bpc);
} else if (bpc != getBytesPerCRC()) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + getBytesPerCRC());
}
//read crc-per-block
final long cpb = checksumData.getCrcPerBlock();
if (blockIdx == 0) {
setCrcPerBlock(cpb);
}
//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
md5.write(md5writer);
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = DataChecksum.Type.DEFAULT;
}
if (blockIdx == 0) { // first block
setCrcType(ct);
} else if (getCrcType() != DataChecksum.Type.MIXED &&
getCrcType() != ct) {
// if crc types are mixed in a file
setCrcType(DataChecksum.Type.MIXED);
}
if (LOG.isDebugEnabled()) {
if (blockIdx == 0) {
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock());
}
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
}
}
}
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -46,7 +47,9 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
@ -923,6 +926,46 @@ public void blockChecksum(ExtendedBlock block,
datanode.metrics.addBlockChecksumOp(elapsed());
}
@Override
public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
final Token<BlockTokenIdentifier> blockToken)
throws IOException {
updateCurrentThreadName("Getting checksum for block group" +
stripedBlockInfo.getBlock());
final DataOutputStream out = new DataOutputStream(getOutputStream());
checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
AbstractBlockChecksumComputer maker =
new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
try {
maker.compute();
//write reply
BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
.setBytesPerCrc(maker.getBytesPerCRC())
.setCrcPerBlock(maker.getCrcPerBlock())
.setMd5(ByteString.copyFrom(maker.getOutBytes()))
.setCrcType(PBHelperClient.convert(maker.getCrcType())))
.build()
.writeDelimitedTo(out);
out.flush();
} catch (IOException ioe) {
LOG.info("blockChecksum " + stripedBlockInfo.getBlock() +
" received exception " + ioe);
incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(out);
}
//update metrics
datanode.metrics.addBlockChecksumOp(elapsed());
}
@Override
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
/**
* This test serves a prototype to demo the idea proposed so far. It creates two
* files using the same data, one is in replica mode, the other is in stripped
* layout. For simple, it assumes 6 data blocks in both files and the block size
* are the same.
*/
public class TestFileChecksum {
public static final Log LOG = LogFactory.getLog(TestFileChecksum.class);
private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private Configuration conf;
private DFSClient client;
private int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private int stripesPerBlock = 6;
private int blockSize = cellSize * stripesPerBlock;
private int numBlockGroups = 10;
private int stripSize = cellSize * dataBlocks;
private int blockGroupSize = stripesPerBlock * stripSize;
private int fileSize = numBlockGroups * blockGroupSize;
private String ecDir = "/striped";
private String stripedFile1 = ecDir + "/stripedFileChecksum1";
private String stripedFile2 = ecDir + "/stripedFileChecksum2";
private String replicatedFile = "/replicatedFileChecksum";
@Before
public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2;
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
Path ecPath = new Path(ecDir);
cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault());
cluster.getFileSystem().getClient().setErasureCodingPolicy(ecDir, null);
fs = cluster.getFileSystem();
client = fs.getClient();
prepareTestFiles();
getDataNodeToKill(stripedFile1);
getDataNodeToKill(replicatedFile);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testStripedFileChecksum1() throws Exception {
int length = 0;
testStripedFileChecksum(length, length + 10);
}
@Test
public void testStripedFileChecksum2() throws Exception {
int length = stripSize - 1;
testStripedFileChecksum(length, length - 10);
}
@Test
public void testStripedFileChecksum3() throws Exception {
int length = stripSize;
testStripedFileChecksum(length, length - 10);
}
@Test
public void testStripedFileChecksum4() throws Exception {
int length = stripSize + cellSize * 2;
testStripedFileChecksum(length, length - 10);
}
@Test
public void testStripedFileChecksum5() throws Exception {
int length = blockGroupSize;
testStripedFileChecksum(length, length - 10);
}
@Test
public void testStripedFileChecksum6() throws Exception {
int length = blockGroupSize + blockSize;
testStripedFileChecksum(length, length - 10);
}
@Test
public void testStripedFileChecksum7() throws Exception {
int length = -1; // whole file
testStripedFileChecksum(length, fileSize);
}
void testStripedFileChecksum(int range1, int range2) throws Exception {
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
range1, false);
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
range1, false);
FileChecksum stripedFileChecksum3 = getFileChecksum(stripedFile2,
range2, false);
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
LOG.info("stripedFileChecksum2:" + stripedFileChecksum2);
LOG.info("stripedFileChecksum3:" + stripedFileChecksum3);
Assert.assertTrue(stripedFileChecksum1.equals(stripedFileChecksum2));
if (range1 >=0 && range1 != range2) {
Assert.assertFalse(stripedFileChecksum1.equals(stripedFileChecksum3));
}
}
@Test
public void testStripedAndReplicatedFileChecksum() throws Exception {
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
10, false);
FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
10, false);
Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
}
/*
// TODO: allow datanode failure, HDFS-9833
@Test
public void testStripedAndReplicatedWithFailure() throws Exception {
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
10, true);
FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
10, true);
Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
}*/
private FileChecksum getFileChecksum(String filePath, int range,
boolean killDn) throws Exception {
int dnIdxToDie = -1;
if (killDn) {
dnIdxToDie = getDataNodeToKill(filePath);
DataNode dnToDie = cluster.getDataNodes().get(dnIdxToDie);
shutdownDataNode(dnToDie);
}
Path testPath = new Path(filePath);
FileChecksum fc;
if (range >= 0) {
fc = fs.getFileChecksum(testPath, range);
} else {
fc = fs.getFileChecksum(testPath);
}
if (dnIdxToDie != -1) {
cluster.restartDataNode(dnIdxToDie, true);
}
return fc;
}
void prepareTestFiles() throws IOException {
byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
String[] filePaths = new String[] {
stripedFile1, stripedFile2, replicatedFile
};
for (String filePath : filePaths) {
Path testPath = new Path(filePath);
DFSTestUtil.writeFile(fs, testPath, fileData);
}
}
void shutdownDataNode(DataNode dataNode) throws IOException {
/*
* Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and
* trigger NN to check heartbeat.
*/
dataNode.shutdown();
cluster.setDataNodeDead(dataNode.getDatanodeId());
}
/**
* Determine the datanode that hosts the first block of the file. For simple
* this just returns the first datanode as it's firstly tried.
*/
int getDataNodeToKill(String filePath) throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(filePath, 0);
LocatedBlock locatedBlock = locatedBlocks.get(0);
DatanodeInfo[] datanodes = locatedBlock.getLocations();
DatanodeInfo chosenDn = datanodes[0];
int idx = 0;
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getInfoPort() == chosenDn.getInfoPort()) {
return idx;
}
idx++;
}
return -1;
}
}