diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 7b243d8381..2e24acacfb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hdds.scm.storage; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -206,6 +210,9 @@ private synchronized void readChunkFromContainer() throws IOException { readChunkResponse = ContainerProtocolCalls .readChunk(xceiverClient, chunkInfo, blockID, traceID); } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } throw new IOException("Unexpected OzoneException: " + e.toString(), e); } ByteString byteString = readChunkResponse.getData(); @@ -215,6 +222,10 @@ private synchronized void readChunkFromContainer() throws IOException { .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); } + ChecksumData checksumData = ChecksumData.getFromProtoBuf( + chunkInfo.getChecksumData()); + Checksum.verifyChecksum(byteString, checksumData); + buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index bdc6a83dc5..85f8646d5c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -21,6 +21,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -74,6 +77,7 @@ public class ChunkOutputStream extends OutputStream { private final BlockData.Builder containerBlockData; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; + private final Checksum checksum; private final String streamId; private int chunkIndex; private int chunkSize; @@ -113,7 +117,8 @@ public class ChunkOutputStream extends OutputStream { public ChunkOutputStream(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String traceID, int chunkSize, long streamBufferFlushSize, - long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) { + long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer, + Checksum checksum) { this.blockID = blockID; this.key = key; this.traceID = traceID; @@ -132,6 +137,7 @@ public ChunkOutputStream(BlockID blockID, String key, this.watchTimeout = watchTimeout; this.buffer = buffer; this.ioException = null; + this.checksum = checksum; // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); @@ -474,13 +480,20 @@ private void checkOpen() throws IOException { * information to be used later in putKey call. * * @throws IOException if there is an I/O error while performing the call + * @throws OzoneChecksumException if there is an error while computing + * checksum */ private void writeChunkToContainer(ByteBuffer chunk) throws IOException { int effectiveChunkSize = chunk.remaining(); ByteString data = ByteString.copyFrom(chunk); - ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName( - DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_" - + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build(); + ChecksumData checksumData = checksum.computeChecksum(data); + ChunkInfo chunkInfo = ChunkInfo.newBuilder() + .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId + + "_chunk_" + ++chunkIndex) + .setOffset(0) + .setLen(effectiveChunkSize) + .setChecksumData(checksumData.getProtoBufMessage()) + .build(); // generate a unique requestId String requestId = traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8d5c180954..879f77339c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -1,4 +1,4 @@ -/** + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -322,6 +322,18 @@ public final class OzoneConfigKeys { public static final String OZONE_CONTAINER_COPY_WORKDIR = "hdds.datanode.replication.work.dir"; + /** + * Config properties to set client side checksum properties. + */ + public static final String OZONE_CLIENT_CHECKSUM_TYPE = + "ozone.client.checksum.type"; + public static final String OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT = "SHA256"; + public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM = + "ozone.client.bytes.per.checksum"; + public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT = + 1024 * 1024; // 1 MB + public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java new file mode 100644 index 0000000000..83293e56d7 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Longs; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.PureJavaCrc32; +import org.apache.hadoop.util.PureJavaCrc32C; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to compute and verify checksums for chunks. + */ +public class Checksum { + + public static final Logger LOG = LoggerFactory.getLogger(Checksum.class); + + private final ChecksumType checksumType; + private final int bytesPerChecksum; + + private PureJavaCrc32 crc32Checksum; + private PureJavaCrc32C crc32cChecksum; + private MessageDigest sha; + + /** + * Constructs a Checksum object. + * @param type type of Checksum + * @param bytesPerChecksum number of bytes of data per checksum + */ + public Checksum(ChecksumType type, int bytesPerChecksum) { + this.checksumType = type; + this.bytesPerChecksum = bytesPerChecksum; + } + + /** + * Constructs a Checksum object with default ChecksumType and default + * BytesPerChecksum. + */ + @VisibleForTesting + public Checksum() { + this.checksumType = ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + this.bytesPerChecksum = OzoneConfigKeys + .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT; + } + + /** + * Computes checksum for give data. + * @param byteString input data in the form of ByteString. + * @return ChecksumData computed for input data. + */ + public ChecksumData computeChecksum(ByteString byteString) + throws OzoneChecksumException { + return computeChecksum(byteString.toByteArray()); + } + + /** + * Computes checksum for give data. + * @param data input data in the form of byte array. + * @return ChecksumData computed for input data. + */ + public ChecksumData computeChecksum(byte[] data) + throws OzoneChecksumException { + ChecksumData checksumData = new ChecksumData(this.checksumType, this + .bytesPerChecksum); + if (checksumType == ChecksumType.NONE) { + // Since type is set to NONE, we do not need to compute the checksums + return checksumData; + } + + switch (checksumType) { + case CRC32: + crc32Checksum = new PureJavaCrc32(); + break; + case CRC32C: + crc32cChecksum = new PureJavaCrc32C(); + break; + case SHA256: + try { + sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + } catch (NoSuchAlgorithmException e) { + throw new OzoneChecksumException(OzoneConsts.FILE_HASH, e); + } + break; + case MD5: + break; + default: + throw new OzoneChecksumException(checksumType); + } + + // Compute number of checksums needs for given data length based on bytes + // per checksum. + int dataSize = data.length; + int numChecksums = (dataSize + bytesPerChecksum - 1) / bytesPerChecksum; + + // Checksum is computed for each bytesPerChecksum number of bytes of data + // starting at offset 0. The last checksum might be computed for the + // remaining data with length less than bytesPerChecksum. + List checksumList = new ArrayList<>(numChecksums); + for (int index = 0; index < numChecksums; index++) { + checksumList.add(computeChecksumAtIndex(data, index)); + } + checksumData.setChecksums(checksumList); + + return checksumData; + } + + /** + * Computes checksum based on checksumType for a data block at given index + * and a max length of bytesPerChecksum. + * @param data input data + * @param index index to compute the offset from where data must be read + * @return computed checksum ByteString + * @throws OzoneChecksumException thrown when ChecksumType is not recognized + */ + private ByteString computeChecksumAtIndex(byte[] data, int index) + throws OzoneChecksumException { + int offset = index * bytesPerChecksum; + int len = bytesPerChecksum; + if ((offset + len) > data.length) { + len = data.length - offset; + } + byte[] checksumBytes = null; + switch (checksumType) { + case CRC32: + checksumBytes = computeCRC32Checksum(data, offset, len); + break; + case CRC32C: + checksumBytes = computeCRC32CChecksum(data, offset, len); + break; + case SHA256: + checksumBytes = computeSHA256Checksum(data, offset, len); + break; + case MD5: + checksumBytes = computeMD5Checksum(data, offset, len); + break; + default: + throw new OzoneChecksumException(checksumType); + } + + return ByteString.copyFrom(checksumBytes); + } + + /** + * Computes CRC32 checksum. + */ + private byte[] computeCRC32Checksum(byte[] data, int offset, int len) { + crc32Checksum.reset(); + crc32Checksum.update(data, offset, len); + return Longs.toByteArray(crc32Checksum.getValue()); + } + + /** + * Computes CRC32C checksum. + */ + private byte[] computeCRC32CChecksum(byte[] data, int offset, int len) { + crc32cChecksum.reset(); + crc32cChecksum.update(data, offset, len); + return Longs.toByteArray(crc32cChecksum.getValue()); + } + + /** + * Computes SHA-256 checksum. + */ + private byte[] computeSHA256Checksum(byte[] data, int offset, int len) { + sha.reset(); + sha.update(data, offset, len); + return sha.digest(); + } + + /** + * Computes MD5 checksum. + */ + private byte[] computeMD5Checksum(byte[] data, int offset, int len) { + MD5Hash md5out = MD5Hash.digest(data, offset, len); + return md5out.getDigest(); + } + + /** + * Computes the ChecksumData for the input data and verifies that it + * matches with that of the input checksumData. + * @param byteString input data + * @param checksumData checksumData to match with + * @throws OzoneChecksumException is thrown if checksums do not match + */ + public static boolean verifyChecksum( + ByteString byteString, ChecksumData checksumData) + throws OzoneChecksumException { + return verifyChecksum(byteString.toByteArray(), checksumData); + } + + /** + * Computes the ChecksumData for the input data and verifies that it + * matches with that of the input checksumData. + * @param data input data + * @param checksumData checksumData to match with + * @throws OzoneChecksumException is thrown if checksums do not match + */ + public static boolean verifyChecksum(byte[] data, ChecksumData checksumData) + throws OzoneChecksumException { + ChecksumType checksumType = checksumData.getChecksumType(); + if (checksumType == ChecksumType.NONE) { + // Checksum is set to NONE. No further verification is required. + return true; + } + + int bytesPerChecksum = checksumData.getBytesPerChecksum(); + Checksum checksum = new Checksum(checksumType, bytesPerChecksum); + ChecksumData computedChecksumData = checksum.computeChecksum(data); + + return checksumData.verifyChecksumDataMatches(computedChecksumData); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java new file mode 100644 index 0000000000..dafa0e32a2 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +/** + * Java class that represents Checksum ProtoBuf class. This helper class allows + * us to convert to and from protobuf to normal java. + */ +public class ChecksumData { + + private ChecksumType type; + // Checksum will be computed for every bytesPerChecksum number of bytes and + // stored sequentially in checksumList + private int bytesPerChecksum; + private List checksums; + + public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) { + this.type = checksumType; + this.bytesPerChecksum = bytesPerChecksum; + this.checksums = Lists.newArrayList(); + } + + /** + * Getter method for checksumType. + */ + public ChecksumType getChecksumType() { + return this.type; + } + + /** + * Getter method for bytesPerChecksum. + */ + public int getBytesPerChecksum() { + return this.bytesPerChecksum; + } + + /** + * Getter method for checksums. + */ + @VisibleForTesting + public List getChecksums() { + return this.checksums; + } + + /** + * Setter method for checksums. + * @param checksumList list of checksums + */ + public void setChecksums(List checksumList) { + this.checksums.clear(); + this.checksums.addAll(checksumList); + } + + /** + * Construct the Checksum ProtoBuf message. + * @return Checksum ProtoBuf message + */ + public ContainerProtos.ChecksumData getProtoBufMessage() { + ContainerProtos.ChecksumData.Builder checksumProtoBuilder = + ContainerProtos.ChecksumData.newBuilder() + .setType(this.type) + .setBytesPerChecksum(this.bytesPerChecksum); + + checksumProtoBuilder.addAllChecksums(checksums); + + return checksumProtoBuilder.build(); + } + + /** + * Constructs Checksum class object from the Checksum ProtoBuf message. + * @param checksumDataProto Checksum ProtoBuf message + * @return ChecksumData object representing the proto + */ + public static ChecksumData getFromProtoBuf( + ContainerProtos.ChecksumData checksumDataProto) { + Preconditions.checkNotNull(checksumDataProto); + + ChecksumData checksumData = new ChecksumData( + checksumDataProto.getType(), checksumDataProto.getBytesPerChecksum()); + + if (checksumDataProto.getChecksumsCount() != 0) { + checksumData.setChecksums(checksumDataProto.getChecksumsList()); + } + + return checksumData; + } + + /** + * Verify that this ChecksumData matches with the input ChecksumData. + * @param that the ChecksumData to match with + * @return true if checksums match + * @throws OzoneChecksumException + */ + public boolean verifyChecksumDataMatches(ChecksumData that) throws + OzoneChecksumException { + + // pre checks + if (this.checksums.size() == 0) { + throw new OzoneChecksumException("Original checksumData has no " + + "checksums"); + } + + if (that.checksums.size() == 0) { + throw new OzoneChecksumException("Computed checksumData has no " + + "checksums"); + } + + if (this.checksums.size() != that.checksums.size()) { + throw new OzoneChecksumException("Original and Computed checksumData's " + + "has different number of checksums"); + } + + // Verify that checksum matches at each index + for (int index = 0; index < this.checksums.size(); index++) { + if (!matchChecksumAtIndex(this.checksums.get(index), + that.checksums.get(index))) { + // checksum mismatch. throw exception. + throw new OzoneChecksumException(index); + } + } + return true; + } + + private static boolean matchChecksumAtIndex( + ByteString expectedChecksumAtIndex, ByteString computedChecksumAtIndex) { + return expectedChecksumAtIndex.equals(computedChecksumAtIndex); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ChecksumData)) { + return false; + } + + ChecksumData that = (ChecksumData) obj; + + if (!this.type.equals(that.getChecksumType())) { + return false; + } + if (this.bytesPerChecksum != that.getBytesPerChecksum()) { + return false; + } + if (this.checksums.size() != that.checksums.size()) { + return false; + } + + // Match checksum at each index + for (int index = 0; index < this.checksums.size(); index++) { + if (!matchChecksumAtIndex(this.checksums.get(index), + that.checksums.get(index))) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder hc = new HashCodeBuilder(); + hc.append(type); + hc.append(bytesPerChecksum); + hc.append(checksums.toArray()); + return hc.toHashCode(); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java new file mode 100644 index 0000000000..20e40af09f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/OzoneChecksumException.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; + +/** Thrown for checksum errors. */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class OzoneChecksumException extends IOException { + + /** + * OzoneChecksumException to throw when checksum verfication fails. + * @param index checksum list index at which checksum match failed + */ + public OzoneChecksumException(int index) { + super(String.format("Checksum mismatch at index %d", index)); + } + + /** + * OzoneChecksumException to throw when unrecognized checksumType is given. + * @param unrecognizedChecksumType + */ + public OzoneChecksumException( + ContainerProtos.ChecksumType unrecognizedChecksumType) { + super(String.format("Unrecognized ChecksumType: %s", + unrecognizedChecksumType)); + } + + /** + * OzoneChecksumException to wrap around NoSuchAlgorithmException. + * @param algorithm name of algorithm + * @param ex original exception thrown + */ + public OzoneChecksumException( + String algorithm, NoSuchAlgorithmException ex) { + super(String.format("NoSuchAlgorithmException thrown while computing " + + "SHA-256 checksum using algorithm %s", algorithm), ex); + } + + /** + * OzoneChecksumException to throw with custom message. + */ + public OzoneChecksumException(String message) { + super(message); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java index 21916b585e..d75f10f19c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.ozone.common.ChecksumData; /** * Java class that represents ChunkInfo ProtoBuf class. This helper class allows @@ -33,7 +34,7 @@ public class ChunkInfo { private final String chunkName; private final long offset; private final long len; - private String checksum; + private ChecksumData checksumData; private final Map metadata; @@ -86,10 +87,9 @@ public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info) info.getMetadata(x).getValue()); } + chunkInfo.setChecksumData( + ChecksumData.getFromProtoBuf(info.getChecksumData())); - if (info.hasChecksum()) { - chunkInfo.setChecksum(info.getChecksum()); - } return chunkInfo; } @@ -105,9 +105,7 @@ public ContainerProtos.ChunkInfo getProtoBufMessage() { builder.setChunkName(this.getChunkName()); builder.setOffset(this.getOffset()); builder.setLen(this.getLen()); - if (this.getChecksum() != null && !this.getChecksum().isEmpty()) { - builder.setChecksum(this.getChecksum()); - } + builder.setChecksumData(this.checksumData.getProtoBufMessage()); for (Map.Entry entry : metadata.entrySet()) { ContainerProtos.KeyValue.Builder keyValBuilder = @@ -147,21 +145,17 @@ public long getLen() { } /** - * Returns the SHA256 value of this chunk. - * - * @return - Hash String + * Returns the checksumData of this chunk. */ - public String getChecksum() { - return checksum; + public ChecksumData getChecksumData() { + return checksumData; } /** - * Sets the Hash value of this chunk. - * - * @param checksum - Hash String. + * Sets the checksums of this chunk. */ - public void setChecksum(String checksum) { - this.checksum = checksum; + public void setChecksumData(ChecksumData cData) { + this.checksumData = cData; } /** diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 9e94dd1a5f..3695b6b306 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -355,8 +355,22 @@ message ChunkInfo { required string chunkName = 1; required uint64 offset = 2; required uint64 len = 3; - optional string checksum = 4; - repeated KeyValue metadata = 5; + repeated KeyValue metadata = 4; + required ChecksumData checksumData =5; +} + +message ChecksumData { + required ChecksumType type = 1; + required uint32 bytesPerChecksum = 2; + repeated bytes checksums = 3; +} + +enum ChecksumType { + NONE = 1; + CRC32 = 2; + CRC32C = 3; + SHA256 = 4; + MD5 = 5; } enum Stage { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java new file mode 100644 index 0000000000..819c29fd61 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link Checksum} class. + */ +public class TestChecksum { + + private static final int BYTES_PER_CHECKSUM = 10; + private static final ContainerProtos.ChecksumType CHECKSUM_TYPE_DEFAULT = + ContainerProtos.ChecksumType.SHA256; + + private Checksum getChecksum(ContainerProtos.ChecksumType type) { + if (type == null) { + type = CHECKSUM_TYPE_DEFAULT; + } + return new Checksum(type, BYTES_PER_CHECKSUM); + } + + /** + * Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}. + */ + @Test + public void testVerifyChecksum() throws Exception { + Checksum checksum = getChecksum(null); + int dataLen = 55; + byte[] data = RandomStringUtils.randomAlphabetic(dataLen).getBytes(); + + ChecksumData checksumData = checksum.computeChecksum(data); + + // A checksum is calculate for each bytesPerChecksum number of bytes in + // the data. Since that value is 10 here and the data length is 55, we + // should have 6 checksums in checksumData. + Assert.assertEquals(6, checksumData.getChecksums().size()); + + // Checksum verification should pass + Assert.assertTrue("Checksum mismatch", + Checksum.verifyChecksum(data, checksumData)); + } + + /** + * Tests that if data is modified, then the checksums should not match. + */ + @Test + public void testIncorrectChecksum() throws Exception { + Checksum checksum = getChecksum(null); + byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes(); + ChecksumData originalChecksumData = checksum.computeChecksum(data); + + // Change the data and check if new checksum matches the original checksum. + // Modifying one byte of data should be enough for the checksum data to + // mismatch + data[50] = (byte) (data[50]+1); + ChecksumData newChecksumData = checksum.computeChecksum(data); + Assert.assertNotEquals("Checksums should not match for different data", + originalChecksumData, newChecksumData); + } + + /** + * Tests that checksum calculated using two different checksumTypes should + * not match. + */ + @Test + public void testChecksumMismatchForDifferentChecksumTypes() throws Exception { + byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes(); + + // Checksum1 of type SHA-256 + Checksum checksum1 = getChecksum(null); + ChecksumData checksumData1 = checksum1.computeChecksum(data); + + // Checksum2 of type CRC32 + Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32); + ChecksumData checksumData2 = checksum2.computeChecksum(data); + + // The two checksums should not match as they have different types + Assert.assertNotEquals( + "Checksums should not match for different checksum types", + checksum1, checksum2); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 2ee82cb8fe..72398438f2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers; import com.google.common.base.Preconditions; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -47,7 +45,6 @@ import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption; -import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.concurrent.ExecutionException; @@ -90,11 +87,6 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, FileLock lock = null; try { - if (chunkInfo.getChecksum() != null && - !chunkInfo.getChecksum().isEmpty()) { - verifyChecksum(chunkInfo, data, log); - } - long writeTimeStart = Time.monotonicNow(); file = AsynchronousFileChannel.open(chunkFile.toPath(), @@ -154,10 +146,8 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, * @throws InterruptedException */ public static ByteBuffer readData(File chunkFile, ChunkInfo data, - VolumeIOStats volumeIOStats) - throws - StorageContainerException, ExecutionException, InterruptedException, - NoSuchAlgorithmException { + VolumeIOStats volumeIOStats) throws StorageContainerException, + ExecutionException, InterruptedException { Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); if (!chunkFile.exists()) { @@ -184,10 +174,7 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data, volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime); volumeIOStats.incReadOpCount(); volumeIOStats.incReadBytes(data.getLen()); - if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { - buf.rewind(); - verifyChecksum(data, buf, log); - } + return buf; } catch (IOException e) { throw new StorageContainerException(e, IO_EXCEPTION); @@ -205,30 +192,6 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data, } } - /** - * Verifies the checksum of a chunk against the data buffer. - * - * @param chunkInfo - Chunk Info. - * @param data - data buffer - * @param log - log - * @throws NoSuchAlgorithmException - * @throws StorageContainerException - */ - private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data, - Logger log) throws NoSuchAlgorithmException, StorageContainerException { - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha.update(data); - data.rewind(); - if (!Hex.encodeHexString(sha.digest()).equals( - chunkInfo.getChecksum())) { - log.error("Checksum mismatch. Provided: {} , computed: {}", - chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest())); - throw new StorageContainerException("Checksum mismatch. Provided: " + - chunkInfo.getChecksum() + " , computed: " + - DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH); - } - } - /** * Validates chunk data and returns a file object to Chunk File that we are * expected to write data to. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index cdd19dff0b..a1c3d014dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -200,10 +200,6 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) containerData.incrReadBytes(length); return data.array(); } - } catch(NoSuchAlgorithmException ex) { - LOG.error("read data failed. error: {}", ex); - throw new StorageContainerException("Internal error: ", - ex, NO_SUCH_ALGORITHM); } catch (ExecutionException ex) { LOG.error("read data failed. error: {}", ex); throw new StorageContainerException("Internal error: ", diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index abb90596a7..717dd05ff9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -221,22 +221,6 @@ public void testDeleteChunkUnsupportedRequest() throws Exception { } } - @Test - public void testWriteChunkChecksumMismatch() throws Exception { - try { - chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID - .getLocalID(), 0), 0, data.length); - //Setting checksum to some value. - chunkInfo.setChecksum("some garbage"); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); - fail("testWriteChunkChecksumMismatch failed"); - } catch (StorageContainerException ex) { - GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex); - assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult()); - } - } - @Test public void testReadChunkFileNotExists() throws Exception { try { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 79b0fe794d..4d76395827 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -83,6 +84,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final long watchTimeout; private final long blockSize; private ByteBuffer buffer; + private final Checksum checksum; /** * A constructor for testing purpose only. */ @@ -102,6 +104,7 @@ public ChunkGroupOutputStream() { buffer = ByteBuffer.allocate(1); watchTimeout = 0; blockSize = 0; + this.checksum = new Checksum(); } /** @@ -113,7 +116,8 @@ public ChunkGroupOutputStream() { */ @VisibleForTesting public void addStream(OutputStream outputStream, long length) { - streamEntries.add(new ChunkOutputStreamEntry(outputStream, length)); + streamEntries.add( + new ChunkOutputStreamEntry(outputStream, length, checksum)); } @VisibleForTesting @@ -145,7 +149,8 @@ public ChunkGroupOutputStream(OpenKeySession handler, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, - long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) { + long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, + Checksum checksum) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.omClient = omClient; @@ -163,6 +168,7 @@ public ChunkGroupOutputStream(OpenKeySession handler, this.streamBufferMaxSize = bufferMaxSize; this.blockSize = size; this.watchTimeout = watchTimeout; + this.checksum = checksum; Preconditions.checkState(chunkSize > 0); Preconditions.checkState(streamBufferFlushSize > 0); @@ -216,7 +222,7 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, buffer)); + streamBufferMaxSize, watchTimeout, buffer, checksum)); } @VisibleForTesting @@ -534,6 +540,7 @@ public static class Builder { private long streamBufferMaxSize; private long blockSize; private long watchTimeout; + private Checksum checksum; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -597,10 +604,15 @@ public Builder setWatchTimeout(long timeout) { return this; } + public Builder setChecksum(Checksum checksumObj){ + this.checksum = checksumObj; + return this; + } + public ChunkGroupOutputStream build() throws IOException { return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, - streamBufferMaxSize, blockSize, watchTimeout); + streamBufferMaxSize, blockSize, watchTimeout, checksum); } } @@ -610,6 +622,7 @@ private static class ChunkOutputStreamEntry extends OutputStream { private final String key; private final XceiverClientManager xceiverClientManager; private final XceiverClientSpi xceiverClient; + private final Checksum checksum; private final String requestId; private final int chunkSize; // total number of bytes that should be written to this stream @@ -626,7 +639,7 @@ private static class ChunkOutputStreamEntry extends OutputStream { XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, - long watchTimeout, ByteBuffer buffer) { + long watchTimeout, ByteBuffer buffer, Checksum checksum) { this.outputStream = null; this.blockID = blockID; this.key = key; @@ -641,6 +654,7 @@ private static class ChunkOutputStreamEntry extends OutputStream { this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; this.buffer = buffer; + this.checksum = checksum; } /** @@ -648,7 +662,8 @@ private static class ChunkOutputStreamEntry extends OutputStream { * @param outputStream a existing writable output stream * @param length the length of data to write to the stream */ - ChunkOutputStreamEntry(OutputStream outputStream, long length) { + ChunkOutputStreamEntry(OutputStream outputStream, long length, + Checksum checksum) { this.outputStream = outputStream; this.blockID = null; this.key = null; @@ -663,6 +678,7 @@ private static class ChunkOutputStreamEntry extends OutputStream { streamBufferMaxSize = 0; buffer = null; watchTimeout = 0; + this.checksum = checksum; } long getLength() { @@ -678,7 +694,7 @@ private void checkStream() { this.outputStream = new ChunkOutputStream(blockID, key, xceiverClientManager, xceiverClient, requestId, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, buffer); + streamBufferMaxSize, watchTimeout, buffer, checksum); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 3a0f47580e..65adbfac00 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Client; @@ -42,6 +44,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; +import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -92,6 +95,7 @@ public class RpcClient implements ClientProtocol { ozoneManagerClient; private final XceiverClientManager xceiverClientManager; private final int chunkSize; + private final Checksum checksum; private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; @@ -166,6 +170,26 @@ public RpcClient(Configuration conf) throws IOException { conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + + int configuredChecksumSize = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT); + int checksumSize; + if(configuredChecksumSize < + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { + LOG.warn("The checksum size ({}) is not allowed to be less than the " + + "minimum size ({}), resetting to the minimum size.", + configuredChecksumSize, + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); + checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; + } else { + checksumSize = configuredChecksumSize; + } + String checksumTypeStr = conf.get( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr); + this.checksum = new Checksum(checksumType, checksumSize); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -489,6 +513,7 @@ public OzoneOutputStream createKey( .setStreamBufferMaxSize(streamBufferMaxSize) .setWatchTimeout(watchTimeout) .setBlockSize(blockSize) + .setChecksum(checksum) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 1ed5f676f0..addd8ad4bc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.protocol.StorageType; @@ -39,9 +40,13 @@ import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.helpers + .KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -87,6 +92,8 @@ public class TestOzoneRpcClient { private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient; + private static final String SCM_ID = UUID.randomUUID().toString(); + /** * Create a MiniOzoneCluster for testing. *

@@ -98,7 +105,10 @@ public class TestOzoneRpcClient { public static void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).build(); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(10) + .setScmId(SCM_ID) + .build(); cluster.waitForClusterToBeReady(); ozClient = OzoneClientFactory.getRpcClient(conf); store = ozClient.getObjectStore(); @@ -821,6 +831,92 @@ public void testGetKeyDetails() throws IOException, OzoneException { } } + /** + * Tests reading a corrputed chunk file throws checksum exception. + * @throws IOException + */ + @Test + public void testReadKeyWithCorruptedData() throws IOException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + String keyName = UUID.randomUUID().toString(); + + // Write data into a key + OzoneOutputStream out = bucket.createKey(keyName, + value.getBytes().length, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE); + out.write(value.getBytes()); + out.close(); + + // We need to find the location of the chunk file corresponding to the + // data we just wrote. + OzoneKey key = bucket.getKey(keyName); + long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) + .getContainerID(); + long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0) + .getLocalID(); + + // Get the container by traversing the datanodes. Atleast one of the + // datanode must have this container. + Container container = null; + for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) { + container = hddsDatanode.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID); + if (container != null) { + break; + } + } + Assert.assertNotNull("Container not found", container); + + // From the containerData, get the block iterator for all the blocks in + // the container. + KeyValueContainerData containerData = + (KeyValueContainerData) container.getContainerData(); + String containerPath = new File(containerData.getMetadataPath()) + .getParent(); + KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator( + containerID, new File(containerPath)); + + // Find the block corresponding to the key we put. We use the localID of + // the BlockData to identify out key. + BlockData blockData = null; + while (keyValueBlockIterator.hasNext()) { + blockData = keyValueBlockIterator.nextBlock(); + if (blockData.getBlockID().getLocalID() == localID) { + break; + } + } + Assert.assertNotNull("Block not found", blockData); + + // Get the location of the chunk file + String chunkName = blockData.getChunks().get(0).getChunkName(); + String containreBaseDir = container.getContainerData().getVolume() + .getHddsRootDir().getPath(); + File chunksLocationPath = KeyValueContainerLocationUtil + .getChunksLocationPath(containreBaseDir, SCM_ID, containerID); + File chunkFile = new File(chunksLocationPath, chunkName); + + // Corrupt the contents of the chunk file + String newData = new String("corrupted data"); + FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes()); + + // Try reading the key. Since the chunk file is corrupted, it should + // throw a checksum mismatch exception. + try { + OzoneInputStream is = bucket.readKey(keyName); + is.read(new byte[100]); + Assert.fail("Reading corrupted data should fail."); + } catch (OzoneChecksumException e) { + GenericTestUtils.assertExceptionContains("Checksum mismatch", e); + } + } + @Test public void testDeleteKey() throws IOException, OzoneException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7e9bab5c0d..82c3ab8862 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container; import com.google.common.base.Preconditions; +import java.security.MessageDigest; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.ReplicationType; @@ -26,13 +27,15 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -42,7 +45,6 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -52,7 +54,6 @@ import java.io.IOException; import java.net.ServerSocket; -import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; @@ -180,10 +181,9 @@ public static byte[] getData(int len) { * @throws NoSuchAlgorithmException */ public static void setDataChecksum(ChunkInfo info, byte[] data) - throws NoSuchAlgorithmException { - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha.update(data); - info.setChecksum(Hex.encodeHexString(sha.digest())); + throws OzoneChecksumException { + Checksum checksum = new Checksum(); + info.setChecksumData(checksum.computeChecksum(data)); } /** @@ -197,8 +197,7 @@ public static void setDataChecksum(ChunkInfo info, byte[] data) * @throws NoSuchAlgorithmException */ public static ContainerCommandRequestProto getWriteChunkRequest( - Pipeline pipeline, BlockID blockID, int datalen) - throws IOException, NoSuchAlgorithmException { + Pipeline pipeline, BlockID blockID, int datalen) throws IOException { LOG.trace("writeChunk {} (blockID={}) to pipeline=", datalen, blockID, pipeline); ContainerProtos.WriteChunkRequestProto.Builder writeRequest = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index c2fb2ea063..0db047f09e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -140,7 +140,8 @@ private void createToDeleteBlocks(ContainerSet containerSet, .setChunkName(chunk.getAbsolutePath()) .setLen(0) .setOffset(0) - .setChecksum("") + .setChecksumData( + ContainerProtos.ChecksumData.getDefaultInstance()) .build(); chunks.add(info); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index a48d6378c7..fb7b0c5c09 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -320,8 +322,7 @@ public void testListContainer() throws IOException { Assert.assertTrue(testMap.isEmpty()); } - private ChunkInfo writeChunkHelper(BlockID blockID) - throws IOException, NoSuchAlgorithmException { + private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException { final int datalen = 1024; long testContainerID = blockID.getContainerID(); Container container = containerSet.getContainer(testContainerID); @@ -360,8 +361,7 @@ public void testWriteChunk() throws IOException, * @throws NoSuchAlgorithmException */ @Test - public void testWritReadManyChunks() throws IOException, - NoSuchAlgorithmException { + public void testWritReadManyChunks() throws IOException { final int datalen = 1024; final int chunkCount = 1024; @@ -386,32 +386,29 @@ public void testWritReadManyChunks() throws IOException, Path dataDir = Paths.get(cNewData.getChunksPath()); String globFormat = String.format("%s.data.*", blockID.getLocalID()); - MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); // Read chunk via file system and verify. int count = 0; try (DirectoryStream stream = Files.newDirectoryStream(dataDir, globFormat)) { + Checksum checksum = new Checksum(); + for (Path fname : stream) { - sha.update(FileUtils.readFileToByteArray(fname.toFile())); - String val = Hex.encodeHexString(sha.digest()); + ChecksumData checksumData = checksum + .computeChecksum(FileUtils.readFileToByteArray(fname.toFile())); Assert.assertEquals(fileHashMap.get(fname.getFileName().toString()) - .getChecksum(), val); + .getChecksumData(), checksumData); count++; - sha.reset(); } Assert.assertEquals(chunkCount, count); // Read chunk via ReadChunk call. - sha.reset(); for (int x = 0; x < chunkCount; x++) { String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); ChunkInfo info = fileHashMap.get(fileName); byte[] data = chunkManager.readChunk(container, blockID, info); - sha.update(data); - Assert.assertEquals(Hex.encodeHexString(sha.digest()), - info.getChecksum()); - sha.reset(); + ChecksumData checksumData = checksum.computeChecksum(data); + Assert.assertEquals(info.getChecksumData(), checksumData); } } } @@ -571,7 +568,7 @@ public void testPutBlock() throws IOException, NoSuchAlgorithmException { getBlock(container, blockData.getBlockID()); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); - Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); + Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData()); } /** @@ -629,7 +626,7 @@ public void testPutBlockWithInvalidBCSId() getBlock(container, blockData.getBlockID()); ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); - Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); + Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData()); } /** @@ -684,7 +681,8 @@ public void testPutBlockWithLotsOfChunks() throws IOException, ChunkInfo readChunk = ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData .getChunks().size() - 1)); - Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum()); + Assert.assertEquals( + lastChunk.getChecksumData(), readChunk.getChecksumData()); } /** diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index fd100553de..26a4ac13b8 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -24,7 +24,9 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.client.io.LengthInputStream; +import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -84,6 +86,7 @@ public final class DistributedStorageHandler implements StorageHandler { private final long streamBufferMaxSize; private final long watchTimeout; private final long blockSize; + private final Checksum checksum; /** * Creates a new DistributedStorageHandler. @@ -128,6 +131,27 @@ public DistributedStorageHandler(OzoneConfiguration conf, conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + + int configuredChecksumSize = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT); + int checksumSize; + if(configuredChecksumSize < + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { + LOG.warn("The checksum size ({}) is not allowed to be less than the " + + "minimum size ({}), resetting to the minimum size.", + configuredChecksumSize, + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); + checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; + } else { + checksumSize = configuredChecksumSize; + } + String checksumTypeStr = conf.get( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType + .valueOf(checksumTypeStr); + this.checksum = new Checksum(checksumType, checksumSize); } @Override @@ -426,6 +450,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, .setStreamBufferMaxSize(streamBufferMaxSize) .setBlockSize(blockSize) .setWatchTimeout(watchTimeout) + .setChecksum(checksum) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(),