diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java new file mode 100644 index 0000000000..e1ed5cbcfc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.util.CrcUtil; +import org.apache.hadoop.util.DataChecksum; + +/** Composite CRC. */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public class CompositeCrcFileChecksum extends FileChecksum { + public static final int LENGTH = Integer.SIZE / Byte.SIZE; + + private int crc; + private DataChecksum.Type crcType; + private int bytesPerCrc; + + /** Create a CompositeCrcFileChecksum. */ + public CompositeCrcFileChecksum( + int crc, DataChecksum.Type crcType, int bytesPerCrc) { + this.crc = crc; + this.crcType = crcType; + this.bytesPerCrc = bytesPerCrc; + } + + @Override + public String getAlgorithmName() { + return "COMPOSITE-" + crcType.name(); + } + + @Override + public int getLength() { + return LENGTH; + } + + @Override + public byte[] getBytes() { + return CrcUtil.intToBytes(crc); + } + + @Override + public ChecksumOpt getChecksumOpt() { + return new ChecksumOpt(crcType, bytesPerCrc); + } + + @Override + public void readFields(DataInput in) throws IOException { + crc = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(crc); + } + + @Override + public String toString() { + return getAlgorithmName() + ":" + String.format("0x%08x", crc); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index e455abffcd..126e754731 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -504,4 +504,15 @@ public String toString() { } + /** + * Enum for indicating what mode to use when combining chunk and block + * checksums to define an aggregate FileChecksum. This should be considered + * a client-side runtime option rather than a persistent property of any + * stored metadata, which is why this is not part of ChecksumOpt, which + * deals with properties of files at rest. + */ + public enum ChecksumCombineMode { + MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs + COMPOSITE_CRC // Block/chunk-independent composite CRC + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java new file mode 100644 index 0000000000..4023995941 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Encapsulates logic for composing multiple CRCs into one or more combined CRCs + * corresponding to concatenated underlying data ranges. Optimized for composing + * a large number of CRCs that correspond to underlying chunks of data all of + * same size. + */ +@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"}) +@InterfaceStability.Unstable +public class CrcComposer { + private static final int CRC_SIZE_BYTES = 4; + private static final Logger LOG = LoggerFactory.getLogger(CrcComposer.class); + + private final int crcPolynomial; + private final int precomputedMonomialForHint; + private final long bytesPerCrcHint; + private final long stripeLength; + + private int curCompositeCrc = 0; + private long curPositionInStripe = 0; + private ByteArrayOutputStream digestOut = new ByteArrayOutputStream(); + + /** + * Returns a CrcComposer which will collapse all ingested CRCs into a single + * value. + */ + public static CrcComposer newCrcComposer( + DataChecksum.Type type, long bytesPerCrcHint) + throws IOException { + return newStripedCrcComposer(type, bytesPerCrcHint, Long.MAX_VALUE); + } + + /** + * Returns a CrcComposer which will collapse CRCs for every combined + * underlying data size which aligns with the specified stripe boundary. For + * example, if "update" is called with 20 CRCs and bytesPerCrc == 5, and + * stripeLength == 10, then every two (10 / 5) consecutive CRCs will be + * combined with each other, yielding a list of 10 CRC "stripes" in the + * final digest, each corresponding to 10 underlying data bytes. Using + * a stripeLength greater than the total underlying data size is equivalent + * to using a non-striped CrcComposer. + */ + public static CrcComposer newStripedCrcComposer( + DataChecksum.Type type, long bytesPerCrcHint, long stripeLength) + throws IOException { + int polynomial = DataChecksum.getCrcPolynomialForType(type); + return new CrcComposer( + polynomial, + CrcUtil.getMonomial(bytesPerCrcHint, polynomial), + bytesPerCrcHint, + stripeLength); + } + + CrcComposer( + int crcPolynomial, + int precomputedMonomialForHint, + long bytesPerCrcHint, + long stripeLength) { + LOG.debug( + "crcPolynomial=0x{}, precomputedMonomialForHint=0x{}, " + + "bytesPerCrcHint={}, stripeLength={}", + Integer.toString(crcPolynomial, 16), + Integer.toString(precomputedMonomialForHint, 16), + bytesPerCrcHint, + stripeLength); + this.crcPolynomial = crcPolynomial; + this.precomputedMonomialForHint = precomputedMonomialForHint; + this.bytesPerCrcHint = bytesPerCrcHint; + this.stripeLength = stripeLength; + } + + /** + * Composes length / CRC_SIZE_IN_BYTES more CRCs from crcBuffer, with + * each CRC expected to correspond to exactly {@code bytesPerCrc} underlying + * data bytes. + * + * @param length must be a multiple of the expected byte-size of a CRC. + */ + public void update( + byte[] crcBuffer, int offset, int length, long bytesPerCrc) + throws IOException { + if (length % CRC_SIZE_BYTES != 0) { + throw new IOException(String.format( + "Trying to update CRC from byte array with length '%d' at offset " + + "'%d' which is not a multiple of %d!", + length, offset, CRC_SIZE_BYTES)); + } + int limit = offset + length; + while (offset < limit) { + int crcB = CrcUtil.readInt(crcBuffer, offset); + update(crcB, bytesPerCrc); + offset += CRC_SIZE_BYTES; + } + } + + /** + * Composes {@code numChecksumsToRead} additional CRCs into the current digest + * out of {@code checksumIn}, with each CRC expected to correspond to exactly + * {@code bytesPerCrc} underlying data bytes. + */ + public void update( + DataInputStream checksumIn, long numChecksumsToRead, long bytesPerCrc) + throws IOException { + for (long i = 0; i < numChecksumsToRead; ++i) { + int crcB = checksumIn.readInt(); + update(crcB, bytesPerCrc); + } + } + + /** + * Updates with a single additional CRC which corresponds to an underlying + * data size of {@code bytesPerCrc}. + */ + public void update(int crcB, long bytesPerCrc) throws IOException { + if (curCompositeCrc == 0) { + curCompositeCrc = crcB; + } else if (bytesPerCrc == bytesPerCrcHint) { + curCompositeCrc = CrcUtil.composeWithMonomial( + curCompositeCrc, crcB, precomputedMonomialForHint, crcPolynomial); + } else { + curCompositeCrc = CrcUtil.compose( + curCompositeCrc, crcB, bytesPerCrc, crcPolynomial); + } + + curPositionInStripe += bytesPerCrc; + + if (curPositionInStripe > stripeLength) { + throw new IOException(String.format( + "Current position in stripe '%d' after advancing by bytesPerCrc '%d' " + + "exceeds stripeLength '%d' without stripe alignment.", + curPositionInStripe, bytesPerCrc, stripeLength)); + } else if (curPositionInStripe == stripeLength) { + // Hit a stripe boundary; flush the curCompositeCrc and reset for next + // stripe. + digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES); + curCompositeCrc = 0; + curPositionInStripe = 0; + } + } + + /** + * Returns byte representation of composed CRCs; if no stripeLength was + * specified, the digest should be of length equal to exactly one CRC. + * Otherwise, the number of CRCs in the returned array is equal to the + * total sum bytesPerCrc divided by stripeLength. If the sum of bytesPerCrc + * is not a multiple of stripeLength, then the last CRC in the array + * corresponds to totalLength % stripeLength underlying data bytes. + */ + public byte[] digest() { + if (curPositionInStripe > 0) { + digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES); + curCompositeCrc = 0; + curPositionInStripe = 0; + } + byte[] digestValue = digestOut.toByteArray(); + digestOut.reset(); + return digestValue; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java new file mode 100644 index 0000000000..42eaf148d6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.util.Arrays; + +/** + * This class provides utilities for working with CRCs. + */ +@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"}) +@InterfaceStability.Unstable +public final class CrcUtil { + public static final int MULTIPLICATIVE_IDENTITY = 0x80000000; + public static final int GZIP_POLYNOMIAL = 0xEDB88320; + public static final int CASTAGNOLI_POLYNOMIAL = 0x82F63B78; + + /** + * Hide default constructor for a static utils class. + */ + private CrcUtil() { + } + + /** + * Compute x^({@code lengthBytes} * 8) mod {@code mod}, where {@code mod} is + * in "reversed" (little-endian) format such that {@code mod & 1} represents + * x^31 and has an implicit term x^32. + */ + public static int getMonomial(long lengthBytes, int mod) { + if (lengthBytes == 0) { + return MULTIPLICATIVE_IDENTITY; + } else if (lengthBytes < 0) { + throw new IllegalArgumentException( + "lengthBytes must be positive, got " + lengthBytes); + } + + // Decompose into + // x^degree == x ^ SUM(bit[i] * 2^i) == PRODUCT(x ^ (bit[i] * 2^i)) + // Generate each x^(2^i) by squaring. + // Since 'degree' is in 'bits', but we only need to support byte + // granularity we can begin with x^8. + int multiplier = MULTIPLICATIVE_IDENTITY >>> 8; + int product = MULTIPLICATIVE_IDENTITY; + long degree = lengthBytes; + while (degree > 0) { + if ((degree & 1) != 0) { + product = (product == MULTIPLICATIVE_IDENTITY) ? multiplier : + galoisFieldMultiply(product, multiplier, mod); + } + multiplier = galoisFieldMultiply(multiplier, multiplier, mod); + degree >>= 1; + } + return product; + } + + /** + * @param monomial Precomputed x^(lengthBInBytes * 8) mod {@code mod} + */ + public static int composeWithMonomial( + int crcA, int crcB, int monomial, int mod) { + return galoisFieldMultiply(crcA, monomial, mod) ^ crcB; + } + + /** + * @param lengthB length of content corresponding to {@code crcB}, in bytes. + */ + public static int compose(int crcA, int crcB, long lengthB, int mod) { + int monomial = getMonomial(lengthB, mod); + return composeWithMonomial(crcA, crcB, monomial, mod); + } + + /** + * @return 4-byte array holding the big-endian representation of + * {@code value}. + */ + public static byte[] intToBytes(int value) { + byte[] buf = new byte[4]; + try { + writeInt(buf, 0, value); + } catch (IOException ioe) { + // Since this should only be able to occur from code bugs within this + // class rather than user input, we throw as a RuntimeException + // rather than requiring this method to declare throwing IOException + // for something the caller can't control. + throw new RuntimeException(ioe); + } + return buf; + } + + /** + * Writes big-endian representation of {@code value} into {@code buf} + * starting at {@code offset}. buf.length must be greater than or + * equal to offset + 4. + */ + public static void writeInt(byte[] buf, int offset, int value) + throws IOException { + if (offset + 4 > buf.length) { + throw new IOException(String.format( + "writeInt out of bounds: buf.length=%d, offset=%d", + buf.length, offset)); + } + buf[offset + 0] = (byte)((value >>> 24) & 0xff); + buf[offset + 1] = (byte)((value >>> 16) & 0xff); + buf[offset + 2] = (byte)((value >>> 8) & 0xff); + buf[offset + 3] = (byte)(value & 0xff); + } + + /** + * Reads 4-byte big-endian int value from {@code buf} starting at + * {@code offset}. buf.length must be greater than or equal to offset + 4. + */ + public static int readInt(byte[] buf, int offset) + throws IOException { + if (offset + 4 > buf.length) { + throw new IOException(String.format( + "readInt out of bounds: buf.length=%d, offset=%d", + buf.length, offset)); + } + int value = ((buf[offset + 0] & 0xff) << 24) | + ((buf[offset + 1] & 0xff) << 16) | + ((buf[offset + 2] & 0xff) << 8) | + ((buf[offset + 3] & 0xff)); + return value; + } + + /** + * For use with debug statements; verifies bytes.length on creation, + * expecting it to represent exactly one CRC, and returns a hex + * formatted value. + */ + public static String toSingleCrcString(final byte[] bytes) + throws IOException { + if (bytes.length != 4) { + throw new IOException((String.format( + "Unexpected byte[] length '%d' for single CRC. Contents: %s", + bytes.length, Arrays.toString(bytes)))); + } + return String.format("0x%08x", readInt(bytes, 0)); + } + + /** + * For use with debug statements; verifies bytes.length on creation, + * expecting it to be divisible by CRC byte size, and returns a list of + * hex formatted values. + */ + public static String toMultiCrcString(final byte[] bytes) + throws IOException { + if (bytes.length % 4 != 0) { + throw new IOException((String.format( + "Unexpected byte[] length '%d' not divisible by 4. Contents: %s", + bytes.length, Arrays.toString(bytes)))); + } + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (int i = 0; i < bytes.length; i += 4) { + sb.append(String.format("0x%08x", readInt(bytes, i))); + if (i != bytes.length - 4) { + sb.append(", "); + } + } + sb.append(']'); + return sb.toString(); + } + + /** + * Galois field multiplication of {@code p} and {@code q} with the + * generator polynomial {@code m} as the modulus. + * + * @param m The little-endian polynomial to use as the modulus when + * multiplying p and q, with implicit "1" bit beyond the bottom bit. + */ + private static int galoisFieldMultiply(int p, int q, int m) { + int summation = 0; + + // Top bit is the x^0 place; each right-shift increments the degree of the + // current term. + int curTerm = MULTIPLICATIVE_IDENTITY; + + // Iteratively multiply p by x mod m as we go to represent the q[i] term + // (of degree x^i) times p. + int px = p; + + while (curTerm != 0) { + if ((q & curTerm) != 0) { + summation ^= px; + } + + // Bottom bit represents highest degree since we're little-endian; before + // we multiply by "x" for the next term, check bottom bit to know whether + // the resulting px will thus have a term matching the implicit "1" term + // of "m" and thus will need to subtract "m" after mutiplying by "x". + boolean hasMaxDegree = ((px & 1) != 0); + px >>>= 1; + if (hasMaxDegree) { + px ^= m; + } + curTerm >>>= 1; + } + return summation; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 43e377f1fb..06ef8acc52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -104,6 +104,24 @@ static Checksum newCrc32C() { } } + /** + * @return the int representation of the polynomial associated with the + * CRC {@code type}, suitable for use with further CRC arithmetic. + * @throws IOException if there is no CRC polynomial applicable + * to the given {@code type}. + */ + public static int getCrcPolynomialForType(Type type) throws IOException { + switch (type) { + case CRC32: + return CrcUtil.GZIP_POLYNOMIAL; + case CRC32C: + return CrcUtil.CASTAGNOLI_POLYNOMIAL; + default: + throw new IOException( + "No CRC polynomial could be associated with type: " + type); + } + } + public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) { if ( bytesPerChecksum <= 0 ) { return null; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java new file mode 100644 index 0000000000..f08702e35e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static org.junit.Assert.*; + +/** + * Unittests for CrcComposer. + */ +public class TestCrcComposer { + @Rule + public Timeout globalTimeout = new Timeout(10000); + + private Random rand = new Random(1234); + + private DataChecksum.Type type = DataChecksum.Type.CRC32C; + private DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + private int dataSize = 75; + private byte[] data = new byte[dataSize]; + private int chunkSize = 10; + private int cellSize = 20; + + private int fullCrc; + private int[] crcsByChunk; + private int[] crcsByCell; + + private byte[] crcBytesByChunk; + private byte[] crcBytesByCell; + + @Before + public void setup() throws IOException { + rand.nextBytes(data); + fullCrc = getRangeChecksum(data, 0, dataSize); + + // 7 chunks of size chunkSize, 1 chunk of size (dataSize % chunkSize). + crcsByChunk = new int[8]; + for (int i = 0; i < 7; ++i) { + crcsByChunk[i] = getRangeChecksum(data, i * chunkSize, chunkSize); + } + crcsByChunk[7] = getRangeChecksum( + data, (crcsByChunk.length - 1) * chunkSize, dataSize % chunkSize); + + // 3 cells of size cellSize, 1 cell of size (dataSize % cellSize). + crcsByCell = new int[4]; + for (int i = 0; i < 3; ++i) { + crcsByCell[i] = getRangeChecksum(data, i * cellSize, cellSize); + } + crcsByCell[3] = getRangeChecksum( + data, (crcsByCell.length - 1) * cellSize, dataSize % cellSize); + + crcBytesByChunk = intArrayToByteArray(crcsByChunk); + crcBytesByCell = intArrayToByteArray(crcsByCell); + } + + private int getRangeChecksum(byte[] buf, int offset, int length) { + checksum.reset(); + checksum.update(buf, offset, length); + return (int) checksum.getValue(); + } + + private byte[] intArrayToByteArray(int[] values) throws IOException { + byte[] bytes = new byte[values.length * 4]; + for (int i = 0; i < values.length; ++i) { + CrcUtil.writeInt(bytes, i * 4, values[i]); + } + return bytes; + } + + @Test + public void testUnstripedIncorrectChunkSize() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + + // If we incorrectly specify that all CRCs ingested correspond to chunkSize + // when the last CRC in the array actually corresponds to + // dataSize % chunkSize then we expect the resulting CRC to not be equal to + // the fullCrc. + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length, chunkSize); + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertNotEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedByteArray() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize); + digester.update( + crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedDataInputStream() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedSingleCrcs() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + for (int i = 0; i < crcsByChunk.length - 1; ++i) { + digester.update(crcsByChunk[i], chunkSize); + } + digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testStripedByteArray() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize); + digester.update( + crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testStripedDataInputStream() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testStripedSingleCrcs() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + for (int i = 0; i < crcsByChunk.length - 1; ++i) { + digester.update(crcsByChunk[i], chunkSize); + } + digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testMultiStageMixed() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + + // First combine chunks into cells. + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + byte[] digest = digester.digest(); + + // Second, individually combine cells into full crc. + digester = + CrcComposer.newCrcComposer(type, cellSize); + for (int i = 0; i < digest.length - 4; i += 4) { + int cellCrc = CrcUtil.readInt(digest, i); + digester.update(cellCrc, cellSize); + } + digester.update(digest, digest.length - 4, 4, dataSize % cellSize); + digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUpdateMismatchesStripe() throws Exception { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + + digester.update(crcsByChunk[0], chunkSize); + + // Going from chunkSize to chunkSize + cellSize will cross a cellSize + // boundary in a single CRC, which is not allowed, since we'd lack a + // CRC corresponding to the actual cellSize boundary. + LambdaTestUtils.intercept( + IOException.class, + "stripe", + () -> digester.update(crcsByChunk[1], cellSize)); + } + + @Test + public void testUpdateByteArrayLengthUnalignedWithCrcSize() + throws Exception { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> digester.update(crcBytesByChunk, 0, 6, chunkSize)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java new file mode 100644 index 0000000000..a98cb8a675 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static org.junit.Assert.*; + +/** + * Unittests for CrcUtil. + */ +public class TestCrcUtil { + @Rule + public Timeout globalTimeout = new Timeout(10000); + + private Random rand = new Random(1234); + + @Test + public void testComposeCrc32() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, false); + } + + @Test + public void testComposeCrc32c() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, false); + } + + @Test + public void testComposeCrc32WithMonomial() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, true); + } + + @Test + public void testComposeCrc32cWithMonomial() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, true); + } + + @Test + public void testComposeCrc32ZeroLength() throws IOException { + doTestComposeCrcZerolength(DataChecksum.Type.CRC32); + } + + @Test + public void testComposeCrc32CZeroLength() throws IOException { + doTestComposeCrcZerolength(DataChecksum.Type.CRC32C); + } + + /** + * Helper method to compare a DataChecksum-computed end-to-end CRC against + * a piecewise-computed CRC that uses CrcUtil.compose on "chunk CRCs" + * corresponding to ever {@code chunkSize} bytes. + */ + private static void doTestComposeCrc( + byte[] data, DataChecksum.Type type, int chunkSize, boolean useMonomial) + throws IOException { + int crcPolynomial = DataChecksum.getCrcPolynomialForType(type); + + // Get full end-to-end CRC in a single shot first. + DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + checksum.update(data, 0, data.length); + int fullCrc = (int) checksum.getValue(); + + // Now compute CRCs of each chunk individually first, and compose them in a + // second pass to compare to the end-to-end CRC. + int compositeCrc = 0; + int crcMonomial = + useMonomial ? CrcUtil.getMonomial(chunkSize, crcPolynomial) : 0; + for (int offset = 0; + offset + chunkSize <= data.length; + offset += chunkSize) { + checksum.reset(); + checksum.update(data, offset, chunkSize); + int partialCrc = (int) checksum.getValue(); + if (useMonomial) { + compositeCrc = CrcUtil.composeWithMonomial( + compositeCrc, partialCrc, crcMonomial, crcPolynomial); + } else { + compositeCrc = CrcUtil.compose( + compositeCrc, partialCrc, chunkSize, crcPolynomial); + } + } + + // There may be a final partial chunk smaller than chunkSize. + int partialChunkSize = data.length % chunkSize; + if (partialChunkSize > 0) { + checksum.reset(); + checksum.update(data, data.length - partialChunkSize, partialChunkSize); + int partialCrc = (int) checksum.getValue(); + compositeCrc = CrcUtil.compose( + compositeCrc, partialCrc, partialChunkSize, crcPolynomial); + } + assertEquals( + String.format( + "Using CRC type '%s' with crcPolynomial '0x%08x' and chunkSize '%d'" + + ", expected '0x%08x', got '0x%08x'", + type, crcPolynomial, chunkSize, fullCrc, compositeCrc), + fullCrc, + compositeCrc); + } + + /** + * Helper method for testing the behavior of composing a CRC with a + * zero-length second CRC. + */ + private static void doTestComposeCrcZerolength(DataChecksum.Type type) + throws IOException { + // Without loss of generality, we can pick any integer as our fake crcA + // even if we don't happen to know the preimage. + int crcA = 0xCAFEBEEF; + int crcPolynomial = DataChecksum.getCrcPolynomialForType(type); + DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + int crcB = (int) checksum.getValue(); + assertEquals(crcA, CrcUtil.compose(crcA, crcB, 0, crcPolynomial)); + + int monomial = CrcUtil.getMonomial(0, crcPolynomial); + assertEquals( + crcA, CrcUtil.composeWithMonomial(crcA, crcB, monomial, crcPolynomial)); + } + + @Test + public void testIntSerialization() throws IOException { + byte[] bytes = CrcUtil.intToBytes(0xCAFEBEEF); + assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0)); + + bytes = new byte[8]; + CrcUtil.writeInt(bytes, 0, 0xCAFEBEEF); + assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0)); + CrcUtil.writeInt(bytes, 4, 0xABCDABCD); + assertEquals(0xABCDABCD, CrcUtil.readInt(bytes, 4)); + + // Assert big-endian format for general Java consistency. + assertEquals(0xBEEFABCD, CrcUtil.readInt(bytes, 2)); + } + + @Test + public void testToSingleCrcStringBadLength() + throws Exception { + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> CrcUtil.toSingleCrcString(new byte[8])); + } + + @Test + public void testToSingleCrcString() throws IOException { + byte[] buf = CrcUtil.intToBytes(0xcafebeef); + assertEquals( + "0xcafebeef", CrcUtil.toSingleCrcString(buf)); + } + + @Test + public void testToMultiCrcStringBadLength() + throws Exception { + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> CrcUtil.toMultiCrcString(new byte[6])); + } + + @Test + public void testToMultiCrcStringMultipleElements() + throws IOException { + byte[] buf = new byte[12]; + CrcUtil.writeInt(buf, 0, 0xcafebeef); + CrcUtil.writeInt(buf, 4, 0xababcccc); + CrcUtil.writeInt(buf, 8, 0xddddefef); + assertEquals( + "[0xcafebeef, 0xababcccc, 0xddddefef]", + CrcUtil.toMultiCrcString(buf)); + } + + @Test + public void testToMultiCrcStringSingleElement() + throws IOException { + byte[] buf = new byte[4]; + CrcUtil.writeInt(buf, 0, 0xcafebeef); + assertEquals( + "[0xcafebeef]", + CrcUtil.toMultiCrcString(buf)); + } + + @Test + public void testToMultiCrcStringNoElements() + throws IOException { + assertEquals( + "[]", + CrcUtil.toMultiCrcString(new byte[0])); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java index 0138195345..d07d5a2644 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -130,9 +130,9 @@ public BlockLocation[] getFileBlockLocations(Path p, long start, long len) } @Override - public FileChecksum getFileChecksum(Path f) + public FileChecksum getFileChecksum(Path f) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); + return dfs.getFileChecksumWithCombineMode(getUriPath(f), Long.MAX_VALUE); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0875328d8a..09154d0778 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -76,6 +77,7 @@ import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -1753,18 +1755,8 @@ public DataEncryptionKey getEncryptionKey() { return encryptionKey; } - /** - * Get the checksum of the whole file or a range of the file. Note that the - * range always starts from the beginning of the file. The file can be - * in replicated form, or striped mode. It can be used to checksum and compare - * two replicated files, or two striped files, but not applicable for two - * files of different block layout forms. - * @param src The file path - * @param length the length of the range, i.e., the range is [0, length] - * @return The checksum - * @see DistributedFileSystem#getFileChecksum(Path) - */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + private FileChecksum getFileChecksumInternal( + String src, long length, ChecksumCombineMode combineMode) throws IOException { checkOpen(); Preconditions.checkArgument(length >= 0); @@ -1779,15 +1771,51 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) maker = ecPolicy != null ? new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src, - length, blockLocations, namenode, this, ecPolicy) : + length, blockLocations, namenode, this, ecPolicy, combineMode) : new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, - blockLocations, namenode, this); + blockLocations, namenode, this, combineMode); maker.compute(); return maker.getFileChecksum(); } + /** + * Get the checksum of the whole file or a range of the file. Note that the + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. Depending on the + * dfs.checksum.combine.mode, checksums may or may not be comparable between + * different block layout forms. + * + * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] + * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) + */ + public FileChecksum getFileChecksumWithCombineMode(String src, long length) + throws IOException { + ChecksumCombineMode combineMode = getConf().getChecksumCombineMode(); + return getFileChecksumInternal(src, length, combineMode); + } + + /** + * Get the checksum of the whole file or a range of the file. Note that the + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. It can be used to checksum and compare + * two replicated files, or two striped files, but not applicable for two + * files of different block layout forms. + * + * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] + * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) + */ + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { + return (MD5MD5CRC32FileChecksum) getFileChecksumInternal( + src, length, ChecksumCombineMode.MD5MD5CRC); + } + protected LocatedBlocks getBlockLocations(String src, long length) throws IOException { //get block locations for the file range diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 03cb317b58..1e9ed097ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1681,7 +1681,8 @@ public FileChecksum getFileChecksum(Path f) throws IOException { return new FileSystemLinkResolver() { @Override public FileChecksum doCall(final Path p) throws IOException { - return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); + return dfs.getFileChecksumWithCombineMode( + getPathName(p), Long.MAX_VALUE); } @Override @@ -1701,7 +1702,7 @@ public FileChecksum getFileChecksum(Path f, final long length) return new FileSystemLinkResolver() { @Override public FileChecksum doCall(final Path p) throws IOException { - return dfs.getFileChecksum(getPathName(p), length); + return dfs.getFileChecksumWithCombineMode(getPathName(p), length); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index 72cf147a08..8f807d5f40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.CompositeCrcFileChecksum; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; -import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -41,6 +46,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.util.CrcComposer; +import org.apache.hadoop.util.CrcUtil; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,9 +74,11 @@ static abstract class FileChecksumComputer { private final long length; private final DFSClient client; private final ClientProtocol namenode; - private final DataOutputBuffer md5out = new DataOutputBuffer(); + private final ChecksumCombineMode combineMode; + private final BlockChecksumType blockChecksumType; + private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer(); - private MD5MD5CRC32FileChecksum fileChecksum; + private FileChecksum fileChecksum; private LocatedBlocks blockLocations; private int timeout; @@ -88,12 +97,24 @@ static abstract class FileChecksumComputer { FileChecksumComputer(String src, long length, LocatedBlocks blockLocations, ClientProtocol namenode, - DFSClient client) throws IOException { + DFSClient client, + ChecksumCombineMode combineMode) throws IOException { this.src = src; this.length = length; this.blockLocations = blockLocations; this.namenode = namenode; this.client = client; + this.combineMode = combineMode; + switch (combineMode) { + case MD5MD5CRC: + this.blockChecksumType = BlockChecksumType.MD5CRC; + break; + case COMPOSITE_CRC: + this.blockChecksumType = BlockChecksumType.COMPOSITE_CRC; + break; + default: + throw new IOException("Unknown ChecksumCombineMode: " + combineMode); + } this.remaining = length; @@ -121,11 +142,19 @@ ClientProtocol getNamenode() { return namenode; } - DataOutputBuffer getMd5out() { - return md5out; + ChecksumCombineMode getCombineMode() { + return combineMode; } - MD5MD5CRC32FileChecksum getFileChecksum() { + BlockChecksumType getBlockChecksumType() { + return blockChecksumType; + } + + DataOutputBuffer getBlockChecksumBuf() { + return blockChecksumBuf; + } + + FileChecksum getFileChecksum() { return fileChecksum; } @@ -226,17 +255,31 @@ void compute() throws IOException { } /** - * Compute and aggregate block checksums block by block. + * Compute block checksums block by block and append the raw bytes of the + * block checksums into getBlockChecksumBuf(). + * * @throws IOException */ abstract void checksumBlocks() throws IOException; /** - * Make final file checksum result given the computing process done. + * Make final file checksum result given the per-block or per-block-group + * checksums collected into getBlockChecksumBuf(). */ - MD5MD5CRC32FileChecksum makeFinalResult() { + FileChecksum makeFinalResult() throws IOException { + switch (combineMode) { + case MD5MD5CRC: + return makeMd5CrcResult(); + case COMPOSITE_CRC: + return makeCompositeCrcResult(); + default: + throw new IOException("Unknown ChecksumCombineMode: " + combineMode); + } + } + + FileChecksum makeMd5CrcResult() { //compute file MD5 - final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); + final MD5Hash fileMD5 = MD5Hash.digest(blockChecksumBuf.getData()); switch (crcType) { case CRC32: return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, @@ -250,6 +293,58 @@ MD5MD5CRC32FileChecksum makeFinalResult() { } } + FileChecksum makeCompositeCrcResult() throws IOException { + long blockSizeHint = 0; + if (locatedBlocks.size() > 0) { + blockSizeHint = locatedBlocks.get(0).getBlockSize(); + } + CrcComposer crcComposer = + CrcComposer.newCrcComposer(getCrcType(), blockSizeHint); + byte[] blockChecksumBytes = blockChecksumBuf.getData(); + + long sumBlockLengths = 0; + for (int i = 0; i < locatedBlocks.size() - 1; ++i) { + LocatedBlock block = locatedBlocks.get(i); + // For everything except the last LocatedBlock, we expect getBlockSize() + // to accurately reflect the number of file bytes digested in the block + // checksum. + sumBlockLengths += block.getBlockSize(); + int blockCrc = CrcUtil.readInt(blockChecksumBytes, i * 4); + + crcComposer.update(blockCrc, block.getBlockSize()); + LOG.debug( + "Added blockCrc 0x{} for block index {} of size {}", + Integer.toString(blockCrc, 16), i, block.getBlockSize()); + } + + // NB: In some cases the located blocks have their block size adjusted + // explicitly based on the requested length, but not all cases; + // these numbers may or may not reflect actual sizes on disk. + long reportedLastBlockSize = + blockLocations.getLastLocatedBlock().getBlockSize(); + long consumedLastBlockLength = reportedLastBlockSize; + if (length - sumBlockLengths < reportedLastBlockSize) { + LOG.warn( + "Last block length {} is less than reportedLastBlockSize {}", + length - sumBlockLengths, reportedLastBlockSize); + consumedLastBlockLength = length - sumBlockLengths; + } + // NB: blockChecksumBytes.length may be much longer than actual bytes + // written into the DataOutput. + int lastBlockCrc = CrcUtil.readInt( + blockChecksumBytes, 4 * (locatedBlocks.size() - 1)); + crcComposer.update(lastBlockCrc, consumedLastBlockLength); + LOG.debug( + "Added lastBlockCrc 0x{} for block index {} of size {}", + Integer.toString(lastBlockCrc, 16), + locatedBlocks.size() - 1, + consumedLastBlockLength); + + int compositeCrc = CrcUtil.readInt(crcComposer.digest(), 0); + return new CompositeCrcFileChecksum( + compositeCrc, getCrcType(), bytesPerCRC); + } + /** * Create and return a sender given an IO stream pair. */ @@ -267,6 +362,117 @@ void close(IOStreamPair pair) { IOUtils.closeStream(pair.out); } } + + /** + * Parses out various checksum properties like bytesPerCrc, crcPerBlock, + * and crcType from {@code checksumData} and either stores them as the + * authoritative value or compares them to a previously extracted value + * to check comppatibility. + * + * @param checksumData response from the datanode + * @param locatedBlock the block corresponding to the response + * @param datanode the datanode which produced the response + * @param blockIdx the block or block-group index of the response + */ + void extractChecksumProperties( + OpBlockChecksumResponseProto checksumData, + LocatedBlock locatedBlock, + DatanodeInfo datanode, + int blockIdx) + throws IOException { + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { + LOG.warn( + "Current bytesPerCRC={} doesn't match next bpc={}, but " + + "continuing anyway because we're using COMPOSITE_CRC. " + + "If trying to preserve CHECKSUMTYPE, only the current " + + "bytesPerCRC will be preserved.", getBytesPerCRC(), bpc); + } else { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (getLocatedBlocks().size() > 1 && blockIdx == 0) { + setCrcPerBlock(cpb); + } + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); + } + + if (blockIdx == 0) { + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { + throw new IOException( + "DataChecksum.Type.MIXED is not supported for COMPOSITE_CRC"); + } else { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + } + + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC={}, crcPerBlock={}", + getBytesPerCRC(), getCrcPerBlock()); + } + } + + /** + * Parses out the raw blockChecksum bytes from {@code checksumData} + * according to the blockChecksumType and populates the cumulative + * blockChecksumBuf with it. + * + * @return a debug-string representation of the parsed checksum if + * debug is enabled, otherwise null. + */ + String populateBlockChecksumBuf(OpBlockChecksumResponseProto checksumData) + throws IOException { + String blockChecksumForDebug = null; + switch (getBlockChecksumType()) { + case MD5CRC: + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getBlockChecksum().toByteArray()); + md5.write(getBlockChecksumBuf()); + if (LOG.isDebugEnabled()) { + blockChecksumForDebug = md5.toString(); + } + break; + case COMPOSITE_CRC: + BlockChecksumType returnedType = PBHelperClient.convert( + checksumData.getBlockChecksumOptions().getBlockChecksumType()); + if (returnedType != BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC", + returnedType)); + } + byte[] crcBytes = checksumData.getBlockChecksum().toByteArray(); + if (LOG.isDebugEnabled()) { + blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes); + } + getBlockChecksumBuf().write(crcBytes); + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + getBlockChecksumType()); + } + return blockChecksumForDebug; + } } /** @@ -278,8 +484,10 @@ static class ReplicatedFileChecksumComputer extends FileChecksumComputer { ReplicatedFileChecksumComputer(String src, long length, LocatedBlocks blockLocations, ClientProtocol namenode, - DFSClient client) throws IOException { - super(src, length, blockLocations, namenode, client); + DFSClient client, + ChecksumCombineMode combineMode) + throws IOException { + super(src, length, blockLocations, namenode, client, combineMode); } @Override @@ -295,7 +503,8 @@ void checksumBlocks() throws IOException { LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); if (!checksumBlock(locatedBlock)) { - throw new IOException("Fail to get block MD5 for " + locatedBlock); + throw new PathIOException( + getSrc(), "Fail to get block MD5 for " + locatedBlock); } } } @@ -368,9 +577,11 @@ private void tryDatanode(LocatedBlock locatedBlock, LOG.debug("write to {}: {}, block={}", datanode, Op.BLOCK_CHECKSUM, block); - // get block MD5 - createSender(pair).blockChecksum(block, - locatedBlock.getBlockToken()); + // get block checksum + createSender(pair).blockChecksum( + block, + locatedBlock.getBlockToken(), + new BlockChecksumOptions(getBlockChecksumType())); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(pair.in)); @@ -381,51 +592,11 @@ private void tryDatanode(LocatedBlock locatedBlock, OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (blockIdx == 0) { //first block - setBytesPerCRC(bpc); - } else if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (getLocatedBlocks().size() > 1 && blockIdx == 0) { - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); - md5.write(getMd5out()); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); - } - - if (blockIdx == 0) { // first block - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED - && getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (blockIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + datanode + ": md5=" + md5); - } + extractChecksumProperties( + checksumData, locatedBlock, datanode, blockIdx); + String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); + LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", + datanode, blockChecksumForDebug, getBlockChecksumType()); } } } @@ -442,9 +613,10 @@ static class StripedFileNonStripedChecksumComputer LocatedBlocks blockLocations, ClientProtocol namenode, DFSClient client, - ErasureCodingPolicy ecPolicy) + ErasureCodingPolicy ecPolicy, + ChecksumCombineMode combineMode) throws IOException { - super(src, length, blockLocations, namenode, client); + super(src, length, blockLocations, namenode, client, combineMode); this.ecPolicy = ecPolicy; } @@ -464,7 +636,8 @@ void checksumBlocks() throws IOException { LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; if (!checksumBlockGroup(blockGroup)) { - throw new IOException("Fail to get block MD5 for " + locatedBlock); + throw new PathIOException( + getSrc(), "Fail to get block checksum for " + locatedBlock); } } } @@ -519,16 +692,18 @@ private void tryDatanode(LocatedStripedBlock blockGroup, StripedBlockInfo stripedBlockInfo, DatanodeInfo datanode, long requestedNumBytes) throws IOException { - try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(), blockGroup.getBlockToken())) { LOG.debug("write to {}: {}, blockGroup={}", datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); - // get block MD5 - createSender(pair).blockGroupChecksum(stripedBlockInfo, - blockGroup.getBlockToken(), requestedNumBytes); + // get block group checksum + createSender(pair).blockGroupChecksum( + stripedBlockInfo, + blockGroup.getBlockToken(), + requestedNumBytes, + new BlockChecksumOptions(getBlockChecksumType())); BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(pair.in)); @@ -538,54 +713,10 @@ private void tryDatanode(LocatedStripedBlock blockGroup, DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (bgIdx == 0) { //first block - setBytesPerCRC(bpc); - } else { - if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(getMd5out()); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = getClient().inferChecksumTypeByReading(blockGroup, datanode); - } - - if (bgIdx == 0) { - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED && - getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (bgIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + datanode + ": md5=" + md5); - } + extractChecksumProperties(checksumData, blockGroup, datanode, bgIdx); + String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); + LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", + datanode, blockChecksumForDebug, getBlockChecksumType()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 52a7cd0c08..f2cec314ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -120,6 +120,8 @@ public interface HdfsClientConfigKeys { String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + String DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode"; + String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC"; String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 2703617812..e63e3f53b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -22,6 +22,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; @@ -38,6 +39,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; @@ -106,6 +109,7 @@ public class DfsClientConf { private final int datanodeSocketWriteTimeout; private final int ioBufferSize; private final ChecksumOpt defaultChecksumOpt; + private final ChecksumCombineMode checksumCombineMode; private final int writePacketSize; private final int writeMaxPackets; private final ByteArrayManager.Conf writeByteArrayManagerConf; @@ -177,6 +181,7 @@ public DfsClientConf(Configuration conf) { CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); defaultChecksumOpt = getChecksumOptFromConf(conf); + checksumCombineMode = getChecksumCombineModeFromConf(conf); dataTransferTcpNoDelay = conf.getBoolean( DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); @@ -300,6 +305,21 @@ private static DataChecksum.Type getChecksumType(Configuration conf) { } } + private static ChecksumCombineMode getChecksumCombineModeFromConf( + Configuration conf) { + final String mode = conf.get( + DFS_CHECKSUM_COMBINE_MODE_KEY, + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + try { + return ChecksumCombineMode.valueOf(mode); + } catch(IllegalArgumentException iae) { + LOG.warn("Bad checksum combine mode: {}. Using default {}", mode, + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + return ChecksumCombineMode.valueOf( + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + } + } + // Construct a checksum option from conf public static ChecksumOpt getChecksumOptFromConf(Configuration conf) { DataChecksum.Type type = getChecksumType(conf); @@ -392,6 +412,13 @@ public ChecksumOpt getDefaultChecksumOpt() { return defaultChecksumOpt; } + /** + * @return the checksumCombineMode + */ + public ChecksumCombineMode getChecksumCombineMode() { + return checksumCombineMode; + } + /** * @return the writePacketSize */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java new file mode 100644 index 0000000000..82e07d42d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Encapsulates various options related to how fine-grained data checksums are + * combined into block-level checksums. + */ +@InterfaceAudience.Private +public class BlockChecksumOptions { + private final BlockChecksumType blockChecksumType; + private final long stripeLength; + + public BlockChecksumOptions( + BlockChecksumType blockChecksumType, long stripeLength) { + this.blockChecksumType = blockChecksumType; + this.stripeLength = stripeLength; + } + + public BlockChecksumOptions(BlockChecksumType blockChecksumType) { + this(blockChecksumType, 0); + } + + public BlockChecksumType getBlockChecksumType() { + return blockChecksumType; + } + + public long getStripeLength() { + return stripeLength; + } + + @Override + public String toString() { + return String.format("blockChecksumType=%s, stripedLength=%d", + blockChecksumType, stripeLength); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java new file mode 100644 index 0000000000..cc33660b5d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Algorithms/types denoting how block-level checksums are computed using + * lower-level chunk checksums/CRCs. + */ +@InterfaceAudience.Private +public enum BlockChecksumType { + MD5CRC, // BlockChecksum obtained by taking the MD5 digest of chunk CRCs + COMPOSITE_CRC // Chunk-independent CRC, optionally striped +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index fe20c37032..384f1dc350 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; @@ -214,11 +215,13 @@ void copyBlock(final ExtendedBlock blk, * * @param blk a block. * @param blockToken security token for accessing the block. + * @param blockChecksumOptions determines how the block-level checksum is + * computed from underlying block metadata. * @throws IOException */ void blockChecksum(ExtendedBlock blk, - Token blockToken) throws IOException; - + Token blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException; /** * Get striped block group checksum (MD5 of CRC32). @@ -227,9 +230,12 @@ void blockChecksum(ExtendedBlock blk, * @param blockToken security token for accessing the block. * @param requestedNumBytes requested number of bytes in the block group * to compute the checksum. + * @param blockChecksumOptions determines how the block-level checksum is + * computed from underlying block metadata. * @throws IOException */ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token blockToken, - long requestedNumBytes) throws IOException; + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 8a8d20ddb5..7526f96a93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; @@ -267,9 +268,11 @@ public void copyBlock(final ExtendedBlock blk, @Override public void blockChecksum(final ExtendedBlock blk, - final Token blockToken) throws IOException { + final Token blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions)) .build(); send(out, Op.BLOCK_CHECKSUM, proto); @@ -277,8 +280,9 @@ public void blockChecksum(final ExtendedBlock blk, @Override public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token blockToken, long requestedNumBytes) - throws IOException { + Token blockToken, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( stripedBlockInfo.getBlock(), blockToken)) @@ -291,6 +295,7 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( stripedBlockInfo.getErasureCodingPolicy())) .setRequestedNumBytes(requestedNumBytes) + .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions)) .build(); send(out, Op.BLOCK_GROUP_CHECKSUM, proto); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index d9e7aa03bb..ff9733c66a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -247,6 +249,48 @@ public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { return HdfsProtos.ChecksumTypeProto.valueOf(type.id); } + public static HdfsProtos.BlockChecksumTypeProto convert( + BlockChecksumType type) { + switch(type) { + case MD5CRC: + return HdfsProtos.BlockChecksumTypeProto.MD5CRC; + case COMPOSITE_CRC: + return HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC; + default: + throw new IllegalStateException( + "BUG: BlockChecksumType not found, type=" + type); + } + } + + public static BlockChecksumType convert( + HdfsProtos.BlockChecksumTypeProto blockChecksumTypeProto) { + switch(blockChecksumTypeProto) { + case MD5CRC: + return BlockChecksumType.MD5CRC; + case COMPOSITE_CRC: + return BlockChecksumType.COMPOSITE_CRC; + default: + throw new IllegalStateException( + "BUG: BlockChecksumTypeProto not found, type=" + + blockChecksumTypeProto); + } + } + + public static HdfsProtos.BlockChecksumOptionsProto convert( + BlockChecksumOptions options) { + return HdfsProtos.BlockChecksumOptionsProto.newBuilder() + .setBlockChecksumType(convert(options.getBlockChecksumType())) + .setStripeLength(options.getStripeLength()) + .build(); + } + + public static BlockChecksumOptions convert( + HdfsProtos.BlockChecksumOptionsProto options) { + return new BlockChecksumOptions( + convert(options.getBlockChecksumType()), + options.getStripeLength()); + } + public static ExtendedBlockProto convert(final ExtendedBlock b) { if (b == null) return null; return ExtendedBlockProto.newBuilder(). diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 2356201f04..384da54ea1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -148,8 +148,9 @@ message OpCopyBlockProto { required BaseHeaderProto header = 1; } -message OpBlockChecksumProto { +message OpBlockChecksumProto { required BaseHeaderProto header = 1; + optional BlockChecksumOptionsProto blockChecksumOptions = 2; } message OpBlockGroupChecksumProto { @@ -160,6 +161,7 @@ message OpBlockGroupChecksumProto { required ErasureCodingPolicyProto ecPolicy = 4; repeated uint32 blockIndices = 5; required uint64 requestedNumBytes = 6; + optional BlockChecksumOptionsProto blockChecksumOptions = 7; } /** @@ -313,8 +315,9 @@ message DNTransferAckProto { message OpBlockChecksumResponseProto { required uint32 bytesPerCrc = 1; required uint64 crcPerBlock = 2; - required bytes md5 = 3; + required bytes blockChecksum = 3; optional ChecksumTypeProto crcType = 4; + optional BlockChecksumOptionsProto blockChecksumOptions = 5; } message OpCustomProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 29d0b4e179..441b9d6b76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -480,6 +480,27 @@ enum ChecksumTypeProto { CHECKSUM_CRC32C = 2; } +enum BlockChecksumTypeProto { + MD5CRC = 1; // BlockChecksum obtained by taking the MD5 digest of chunk CRCs + COMPOSITE_CRC = 2; // Chunk-independent CRC, optionally striped +} + +/** + * Algorithms/types denoting how block-level checksums are computed using + * lower-level chunk checksums/CRCs. + * These options should be kept in sync with + * org.apache.hadoop.hdfs.protocol.BlockChecksumOptions. + */ +message BlockChecksumOptionsProto { + optional BlockChecksumTypeProto blockChecksumType = 1 [default = MD5CRC]; + + // Only used if blockChecksumType specifies a striped format, such as + // COMPOSITE_CRC. If so, then the blockChecksum in the response is expected + // to be the concatenation of N crcs, where + // N == ((requestedLength - 1) / stripedLength) + 1 + optional uint64 stripeLength = 2; +} + /** * HDFS Server Defaults */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index bab2e8da5f..5d2d1f890b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -301,8 +301,9 @@ private void opBlockChecksum(DataInputStream in) throws IOException { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()), - PBHelperClient.convert(proto.getHeader().getToken())); + blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getHeader().getToken()), + PBHelperClient.convert(proto.getBlockChecksumOptions())); } finally { if (traceScope != null) traceScope.close(); } @@ -325,7 +326,8 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException { try { blockGroupChecksum(stripedBlockInfo, PBHelperClient.convert(proto.getHeader().getToken()), - proto.getRequestedNumBytes()); + proto.getRequestedNumBytes(), + PBHelperClient.convert(proto.getBlockChecksumOptions())); } finally { if (traceScope != null) { traceScope.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index e99911bdef..3388855f8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -32,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumCompositeCrcReconstructor; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumMd5CrcReconstructor; import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor; import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; @@ -40,6 +44,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.CrcComposer; +import org.apache.hadoop.util.CrcUtil; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +76,7 @@ private BlockChecksumHelper() { */ static abstract class AbstractBlockChecksumComputer { private final DataNode datanode; + private final BlockChecksumOptions blockChecksumOptions; private byte[] outBytes; private int bytesPerCRC = -1; @@ -77,8 +84,11 @@ static abstract class AbstractBlockChecksumComputer { private long crcPerBlock = -1; private int checksumSize = -1; - AbstractBlockChecksumComputer(DataNode datanode) throws IOException { + AbstractBlockChecksumComputer( + DataNode datanode, + BlockChecksumOptions blockChecksumOptions) throws IOException { this.datanode = datanode; + this.blockChecksumOptions = blockChecksumOptions; } abstract void compute() throws IOException; @@ -92,6 +102,10 @@ DataNode getDatanode() { return datanode; } + BlockChecksumOptions getBlockChecksumOptions() { + return blockChecksumOptions; + } + InputStream getBlockInputStream(ExtendedBlock block, long seekOffset) throws IOException { return datanode.data.getBlockInputStream(block, seekOffset); @@ -155,8 +169,10 @@ static abstract class BlockChecksumComputer private DataChecksum checksum; BlockChecksumComputer(DataNode datanode, - ExtendedBlock block) throws IOException { - super(datanode); + ExtendedBlock block, + BlockChecksumOptions blockChecksumOptions) + throws IOException { + super(datanode, blockChecksumOptions); this.block = block; this.requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); @@ -268,8 +284,10 @@ byte[] crcPartialBlock() throws IOException { static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer { ReplicatedBlockChecksumComputer(DataNode datanode, - ExtendedBlock block) throws IOException { - super(datanode, block); + ExtendedBlock block, + BlockChecksumOptions blockChecksumOptions) + throws IOException { + super(datanode, block, blockChecksumOptions); } @Override @@ -277,22 +295,38 @@ void compute() throws IOException { try { readHeader(); - MD5Hash md5out; - if (isPartialBlk() && getCrcPerBlock() > 0) { - md5out = checksumPartialBlock(); - } else { - md5out = checksumWholeBlock(); + BlockChecksumType type = + getBlockChecksumOptions().getBlockChecksumType(); + switch (type) { + case MD5CRC: + computeMd5Crc(); + break; + case COMPOSITE_CRC: + computeCompositeCrc(getBlockChecksumOptions().getStripeLength()); + break; + default: + throw new IOException(String.format( + "Unrecognized BlockChecksumType: %s", type)); } - setOutBytes(md5out.getDigest()); - - LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}", - getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out); } finally { IOUtils.closeStream(getChecksumIn()); IOUtils.closeStream(getMetadataIn()); } } + private void computeMd5Crc() throws IOException { + MD5Hash md5out; + if (isPartialBlk() && getCrcPerBlock() > 0) { + md5out = checksumPartialBlock(); + } else { + md5out = checksumWholeBlock(); + } + setOutBytes(md5out.getDigest()); + + LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}", + getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out); + } + private MD5Hash checksumWholeBlock() throws IOException { MD5Hash md5out = MD5Hash.digest(getChecksumIn()); return md5out; @@ -320,6 +354,56 @@ private MD5Hash checksumPartialBlock() throws IOException { return new MD5Hash(digester.digest()); } + + private void computeCompositeCrc(long stripeLength) throws IOException { + long checksumDataLength = + Math.min(getVisibleLength(), getRequestLength()); + if (stripeLength <= 0 || stripeLength > checksumDataLength) { + stripeLength = checksumDataLength; + } + + CrcComposer crcComposer = CrcComposer.newStripedCrcComposer( + getCrcType(), getBytesPerCRC(), stripeLength); + DataInputStream checksumIn = getChecksumIn(); + + // Whether getting the checksum for the entire block (which itself may + // not be a full block size and may have a final chunk smaller than + // getBytesPerCRC()), we begin with a number of full chunks, all of size + // getBytesPerCRC(). + long numFullChunks = checksumDataLength / getBytesPerCRC(); + crcComposer.update(checksumIn, numFullChunks, getBytesPerCRC()); + + // There may be a final partial chunk that is not full-sized. Unlike the + // MD5 case, we still consider this a "partial chunk" even if + // getRequestLength() == getVisibleLength(), since the CRC composition + // depends on the byte size of that final chunk, even if it already has a + // precomputed CRC stored in metadata. So there are two cases: + // 1. Reading only part of a block via getRequestLength(); we get the + // crcPartialBlock() explicitly. + // 2. Reading full visible length; the partial chunk already has a CRC + // stored in block metadata, so we just continue reading checksumIn. + long partialChunkSize = checksumDataLength % getBytesPerCRC(); + if (partialChunkSize > 0) { + if (isPartialBlk()) { + byte[] partialChunkCrcBytes = crcPartialBlock(); + crcComposer.update( + partialChunkCrcBytes, 0, partialChunkCrcBytes.length, + partialChunkSize); + } else { + int partialChunkCrc = checksumIn.readInt(); + crcComposer.update(partialChunkCrc, partialChunkSize); + } + } + + byte[] composedCrcs = crcComposer.digest(); + setOutBytes(composedCrcs); + if (LOG.isDebugEnabled()) { + LOG.debug( + "block={}, getBytesPerCRC={}, crcPerBlock={}, compositeCrc={}", + getBlock(), getBytesPerCRC(), getCrcPerBlock(), + CrcUtil.toMultiCrcString(composedCrcs)); + } + } } /** @@ -335,19 +419,29 @@ static class BlockGroupNonStripedChecksumComputer private final byte[] blockIndices; private final long requestedNumBytes; - private final DataOutputBuffer md5writer = new DataOutputBuffer(); + private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer(); - BlockGroupNonStripedChecksumComputer(DataNode datanode, - StripedBlockInfo stripedBlockInfo, - long requestedNumBytes) + // Keeps track of the positions within blockChecksumBuf where each data + // block's checksum begins; for fixed-size block checksums this is easily + // calculated as a multiple of the checksum size, but for striped block + // CRCs, it's less error-prone to simply keep track of exact byte offsets + // before each block checksum is populated into the buffer. + private final int[] blockChecksumPositions; + + BlockGroupNonStripedChecksumComputer( + DataNode datanode, + StripedBlockInfo stripedBlockInfo, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { - super(datanode); + super(datanode, blockChecksumOptions); this.blockGroup = stripedBlockInfo.getBlock(); this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); this.datanodes = stripedBlockInfo.getDatanodes(); this.blockTokens = stripedBlockInfo.getBlockTokens(); this.blockIndices = stripedBlockInfo.getBlockIndices(); this.requestedNumBytes = requestedNumBytes; + this.blockChecksumPositions = new int[this.ecPolicy.getNumDataUnits()]; } private static class LiveBlockInfo { @@ -383,6 +477,9 @@ void compute() throws IOException { } long checksumLen = 0; for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) { + // Before populating the blockChecksum at this index, record the byte + // offset where it will begin. + blockChecksumPositions[idx] = blockChecksumBuf.getLength(); try { ExtendedBlock block = getInternalBlock(numDataUnits, idx); @@ -409,8 +506,75 @@ void compute() throws IOException { } } - MD5Hash md5out = MD5Hash.digest(md5writer.getData()); - setOutBytes(md5out.getDigest()); + BlockChecksumType type = getBlockChecksumOptions().getBlockChecksumType(); + switch (type) { + case MD5CRC: + MD5Hash md5out = MD5Hash.digest(blockChecksumBuf.getData()); + setOutBytes(md5out.getDigest()); + break; + case COMPOSITE_CRC: + byte[] digest = reassembleNonStripedCompositeCrc(checksumLen); + setOutBytes(digest); + break; + default: + throw new IOException(String.format( + "Unrecognized BlockChecksumType: %s", type)); + } + } + + /** + * @param checksumLen The sum of bytes associated with the block checksum + * data being digested into a block-group level checksum. + */ + private byte[] reassembleNonStripedCompositeCrc(long checksumLen) + throws IOException { + int numDataUnits = ecPolicy.getNumDataUnits(); + CrcComposer crcComposer = CrcComposer.newCrcComposer( + getCrcType(), ecPolicy.getCellSize()); + + // This should hold all the cell-granularity checksums of blk0 + // followed by all cell checksums of blk1, etc. We must unstripe the + // cell checksums in order of logical file bytes. Also, note that the + // length of this array may not equal the the number of actually valid + // bytes in the buffer (blockChecksumBuf.getLength()). + byte[] flatBlockChecksumData = blockChecksumBuf.getData(); + + // Initialize byte-level cursors to where each block's checksum begins + // inside the combined flattened buffer. + int[] blockChecksumCursors = new int[numDataUnits]; + for (int idx = 0; idx < numDataUnits; ++idx) { + blockChecksumCursors[idx] = blockChecksumPositions[idx]; + } + + // Reassemble cell-level CRCs in the right order. + long numFullCells = checksumLen / ecPolicy.getCellSize(); + for (long cellIndex = 0; cellIndex < numFullCells; ++cellIndex) { + int blockIndex = (int) (cellIndex % numDataUnits); + int checksumCursor = blockChecksumCursors[blockIndex]; + int cellCrc = CrcUtil.readInt( + flatBlockChecksumData, checksumCursor); + blockChecksumCursors[blockIndex] += 4; + crcComposer.update(cellCrc, ecPolicy.getCellSize()); + } + if (checksumLen % ecPolicy.getCellSize() != 0) { + // Final partial cell. + int blockIndex = (int) (numFullCells % numDataUnits); + int checksumCursor = blockChecksumCursors[blockIndex]; + int cellCrc = CrcUtil.readInt( + flatBlockChecksumData, checksumCursor); + blockChecksumCursors[blockIndex] += 4; + crcComposer.update(cellCrc, checksumLen % ecPolicy.getCellSize()); + } + byte[] digest = crcComposer.digest(); + if (LOG.isDebugEnabled()) { + LOG.debug("flatBlockChecksumData.length={}, numDataUnits={}, " + + "checksumLen={}, digest={}", + flatBlockChecksumData.length, + numDataUnits, + checksumLen, + CrcUtil.toSingleCrcString(digest)); + } + return digest; } private ExtendedBlock getInternalBlock(int numDataUnits, int idx) { @@ -437,8 +601,26 @@ private void checksumBlock(ExtendedBlock block, int blockIdx, LOG.debug("write to {}: {}, block={}", getDatanode(), Op.BLOCK_CHECKSUM, block); - // get block MD5 - createSender(pair).blockChecksum(block, blockToken); + // get block checksum + // A BlockGroupCheckum of type COMPOSITE_CRC uses underlying + // BlockChecksums also of type COMPOSITE_CRC but with + // stripeLength == ecPolicy.getCellSize(). + BlockChecksumOptions childOptions; + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); + switch (groupChecksumType) { + case MD5CRC: + childOptions = getBlockChecksumOptions(); + break; + case COMPOSITE_CRC: + childOptions = new BlockChecksumOptions( + BlockChecksumType.COMPOSITE_CRC, ecPolicy.getCellSize()); + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + groupChecksumType); + } + createSender(pair).blockChecksum(block, blockToken, childOptions); final DataTransferProtos.BlockOpResponseProto reply = DataTransferProtos.BlockOpResponseProto.parseFrom( @@ -463,10 +645,37 @@ private void checksumBlock(ExtendedBlock block, int blockIdx, setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(), checksumData.getCrcPerBlock(), ct); - //read md5 - final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); - md5.write(md5writer); - LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5); + + switch (groupChecksumType) { + case MD5CRC: + //read md5 + final MD5Hash md5 = + new MD5Hash(checksumData.getBlockChecksum().toByteArray()); + md5.write(blockChecksumBuf); + LOG.debug("got reply from datanode:{}, md5={}", + targetDatanode, md5); + break; + case COMPOSITE_CRC: + BlockChecksumType returnedType = PBHelperClient.convert( + checksumData.getBlockChecksumOptions().getBlockChecksumType()); + if (returnedType != BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC", + returnedType)); + } + byte[] checksumBytes = + checksumData.getBlockChecksum().toByteArray(); + blockChecksumBuf.write(checksumBytes, 0, checksumBytes.length); + if (LOG.isDebugEnabled()) { + LOG.debug("got reply from datanode:{} for blockIdx:{}, checksum:{}", + targetDatanode, blockIdx, + CrcUtil.toMultiCrcString(checksumBytes)); + } + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + groupChecksumType); + } } } @@ -489,10 +698,16 @@ private void recalculateChecksum(int errBlkIndex, long blockLength) StripedReconstructionInfo stripedReconInfo = new StripedReconstructionInfo( blockGroup, ecPolicy, blockIndices, datanodes, errIndices); + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); final StripedBlockChecksumReconstructor checksumRecon = - new StripedBlockChecksumReconstructor( + groupChecksumType == BlockChecksumType.COMPOSITE_CRC ? + new StripedBlockChecksumCompositeCrcReconstructor( getDatanode().getErasureCodingWorker(), stripedReconInfo, - md5writer, blockLength); + blockChecksumBuf, blockLength) : + new StripedBlockChecksumMd5CrcReconstructor( + getDatanode().getErasureCodingWorker(), stripedReconInfo, + blockChecksumBuf, blockLength); checksumRecon.reconstruct(); DataChecksum checksum = checksumRecon.getChecksum(); @@ -501,8 +716,8 @@ private void recalculateChecksum(int errBlkIndex, long blockLength) setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(), crcPerBlock, checksum.getChecksumType()); - LOG.debug("Recalculated checksum for the block index:{}, md5={}", - errBlkIndex, checksumRecon.getMD5()); + LOG.debug("Recalculated checksum for the block index:{}, checksum={}", + errBlkIndex, checksumRecon.getDigestObject()); } private void setOrVerifyChecksumProperties(int blockIdx, int bpc, @@ -524,8 +739,16 @@ private void setOrVerifyChecksumProperties(int blockIdx, int bpc, setCrcType(ct); } else if (getCrcType() != DataChecksum.Type.MIXED && getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); + if (groupChecksumType == BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "BlockChecksumType COMPOSITE_CRC doesn't support MIXED " + + "underlying types; previous block was %s, next block is %s", + getCrcType(), ct)); + } else { + setCrcType(DataChecksum.Type.MIXED); + } } if (blockIdx == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d0ded89b11..0bb1987a11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -968,15 +969,16 @@ public void transferBlock(final ExtendedBlock blk, @Override public void blockChecksum(ExtendedBlock block, - Token blockToken) + Token blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException { updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); - BlockChecksumComputer maker = - new ReplicatedBlockChecksumComputer(datanode, block); + BlockChecksumComputer maker = new ReplicatedBlockChecksumComputer( + datanode, block, blockChecksumOptions); try { maker.compute(); @@ -987,8 +989,10 @@ public void blockChecksum(ExtendedBlock block, .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setBytesPerCrc(maker.getBytesPerCRC()) .setCrcPerBlock(maker.getCrcPerBlock()) - .setMd5(ByteString.copyFrom(maker.getOutBytes())) - .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType())) + .setBlockChecksumOptions( + PBHelperClient.convert(blockChecksumOptions))) .build() .writeDelimitedTo(out); out.flush(); @@ -1007,7 +1011,9 @@ public void blockChecksum(ExtendedBlock block, @Override public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, - final Token blockToken, long requestedNumBytes) + final Token blockToken, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { final ExtendedBlock block = stripedBlockInfo.getBlock(); updateCurrentThreadName("Getting checksum for block group" + @@ -1018,7 +1024,7 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, AbstractBlockChecksumComputer maker = new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo, - requestedNumBytes); + requestedNumBytes, blockChecksumOptions); try { maker.compute(); @@ -1029,8 +1035,10 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setBytesPerCrc(maker.getBytesPerCRC()) .setCrcPerBlock(maker.getCrcPerBlock()) - .setMd5(ByteString.copyFrom(maker.getOutBytes())) - .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType())) + .setBlockChecksumOptions( + PBHelperClient.convert(blockChecksumOptions))) .build() .writeDelimitedTo(out); out.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java new file mode 100644 index 0000000000..afcc8cbef6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.util.CrcComposer; + +/** + * Computes striped composite CRCs over reconstructed chunk CRCs. + */ +@InterfaceAudience.Private +public class StripedBlockChecksumCompositeCrcReconstructor + extends StripedBlockChecksumReconstructor { + private final int ecPolicyCellSize; + + private byte[] digestValue; + private CrcComposer digester; + + public StripedBlockChecksumCompositeCrcReconstructor( + ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo, + DataOutputBuffer checksumWriter, + long requestedBlockLength) throws IOException { + super(worker, stripedReconInfo, checksumWriter, requestedBlockLength); + this.ecPolicyCellSize = stripedReconInfo.getEcPolicy().getCellSize(); + } + + @Override + public Object getDigestObject() { + return digestValue; + } + + @Override + void prepareDigester() throws IOException { + digester = CrcComposer.newStripedCrcComposer( + getChecksum().getChecksumType(), + getChecksum().getBytesPerChecksum(), + ecPolicyCellSize); + } + + @Override + void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException { + if (digester == null) { + throw new IOException(String.format( + "Called updatedDigester with checksumBytes.length=%d, " + + "dataBytesPerChecksum=%d but digester is null", + checksumBytes.length, dataBytesPerChecksum)); + } + digester.update( + checksumBytes, 0, checksumBytes.length, dataBytesPerChecksum); + } + + @Override + void commitDigest() throws IOException { + if (digester == null) { + throw new IOException("Called commitDigest() but digester is null"); + } + digestValue = digester.digest(); + getChecksumWriter().write(digestValue, 0, digestValue.length); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java new file mode 100644 index 0000000000..2f809b8478 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; +import java.security.MessageDigest; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.MD5Hash; + +/** + * Computes running MD5-of-CRC over reconstructed chunk CRCs. + */ +@InterfaceAudience.Private +public class StripedBlockChecksumMd5CrcReconstructor + extends StripedBlockChecksumReconstructor { + private MD5Hash md5; + private MessageDigest digester; + + public StripedBlockChecksumMd5CrcReconstructor(ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo, + DataOutputBuffer checksumWriter, + long requestedBlockLength) throws IOException { + super(worker, stripedReconInfo, checksumWriter, requestedBlockLength); + } + + @Override + public Object getDigestObject() { + return md5; + } + + @Override + void prepareDigester() throws IOException { + digester = MD5Hash.getDigester(); + } + + @Override + void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException { + if (digester == null) { + throw new IOException(String.format( + "Called updatedDigester with checksumBytes.length=%d, " + + "dataBytesPerChecksum=%d but digester is null", + checksumBytes.length, dataBytesPerChecksum)); + } + digester.update(checksumBytes, 0, checksumBytes.length); + } + + @Override + void commitDigest() throws IOException { + if (digester == null) { + throw new IOException("Called commitDigest() but digester is null"); + } + byte[] digest = digester.digest(); + md5 = new MD5Hash(digest); + md5.write(getChecksumWriter()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index d530a8eaac..b2e64966a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -19,12 +19,10 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.MD5Hash; /** * StripedBlockChecksumReconstructor reconstruct one or more missed striped @@ -33,18 +31,17 @@ * using the newly reconstructed block. */ @InterfaceAudience.Private -public class StripedBlockChecksumReconstructor extends StripedReconstructor { - +public abstract class StripedBlockChecksumReconstructor + extends StripedReconstructor { private ByteBuffer targetBuffer; private final byte[] targetIndices; private byte[] checksumBuf; private DataOutputBuffer checksumWriter; - private MD5Hash md5; private long checksumDataLen; private long requestedLen; - public StripedBlockChecksumReconstructor(ErasureCodingWorker worker, + protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo, DataOutputBuffer checksumWriter, long requestedBlockLength) throws IOException { @@ -72,8 +69,9 @@ private void init() throws IOException { checksumBuf = new byte[tmpLen]; } + @Override public void reconstruct() throws IOException { - MessageDigest digester = MD5Hash.getDigester(); + prepareDigester(); long maxTargetLength = getMaxTargetLength(); try { while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) { @@ -88,24 +86,54 @@ public void reconstruct() throws IOException { reconstructTargets(toReconstructLen); // step3: calculate checksum - checksumDataLen += checksumWithTargetOutput(targetBuffer.array(), - toReconstructLen, digester); + checksumDataLen += checksumWithTargetOutput( + targetBuffer.array(), toReconstructLen); updatePositionInBlock(toReconstructLen); requestedLen -= toReconstructLen; clearBuffers(); } - byte[] digest = digester.digest(); - md5 = new MD5Hash(digest); - md5.write(checksumWriter); + commitDigest(); } finally { cleanup(); } } - private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, - MessageDigest digester) throws IOException { + /** + * Should return a representation of a completed/reconstructed digest which + * is suitable for debug printing. + */ + public abstract Object getDigestObject(); + + /** + * This will be called before starting reconstruction. + */ + abstract void prepareDigester() throws IOException; + + /** + * This will be called repeatedly with chunked checksums computed in-flight + * over reconstructed data. + * + * @param dataBytesPerChecksum the number of underlying data bytes + * corresponding to each checksum inside {@code checksumBytes}. + */ + abstract void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException; + + /** + * This will be called when reconstruction of entire requested length is + * complete and any final digests should be committed to + * implementation-specific output fields. + */ + abstract void commitDigest() throws IOException; + + protected DataOutputBuffer getChecksumWriter() { + return checksumWriter; + } + + private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen) + throws IOException { long checksumDataLength = 0; // Calculate partial block checksum. There are two cases. // case-1) length of data bytes which is fraction of bytesPerCRC @@ -128,7 +156,7 @@ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, checksumBuf = new byte[checksumRemaining]; getChecksum().calculateChunkedSums(outputData, dataOffset, remainingLen, checksumBuf, 0); - digester.update(checksumBuf, 0, checksumBuf.length); + updateDigester(checksumBuf, getChecksum().getBytesPerChecksum()); checksumDataLength = checksumBuf.length; dataOffset = remainingLen; } @@ -139,7 +167,7 @@ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, getChecksum().reset(); getChecksum().update(outputData, dataOffset, partialLength); getChecksum().writeValue(partialCrc, 0, true); - digester.update(partialCrc); + updateDigester(partialCrc, partialLength); checksumDataLength += partialCrc.length; } @@ -151,7 +179,7 @@ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, outputData.length, checksumBuf, 0); // updates digest using the checksum array of bytes - digester.update(checksumBuf, 0, checksumBuf.length); + updateDigester(checksumBuf, getChecksum().getBytesPerChecksum()); return checksumBuf.length; } @@ -176,10 +204,6 @@ private void clearBuffers() { targetBuffer.clear(); } - public MD5Hash getMD5() { - return md5; - } - public long getChecksumDataLen() { return checksumDataLen; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index ce1254cfd0..29c0078e95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -79,6 +79,7 @@ public void run() { } } + @Override void reconstruct() throws IOException { while (getPositionInBlock() < getMaxTargetLength()) { DataNodeFaultInjector.get().stripedBlockReconstruction(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1e83325a14..f8cce60c7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3540,6 +3540,17 @@ + + dfs.checksum.combine.mode + MD5MD5CRC + + Defines how lower-level chunk/block checksums are combined into file-level + checksums; the original MD5MD5CRC mode is not comparable between files + with different block layouts, while modes like COMPOSITE_CRC are + comparable independently of block layout. + + + dfs.client.block.write.locateFollowingBlock.retries 5 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 4f9f260081..adede5fa5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -857,6 +857,20 @@ public static void writeFile(FileSystem fs, Path p, byte[] bytes) } } + /* Write the given bytes to the given file using the specified blockSize */ + public static void writeFile( + FileSystem fs, Path p, byte[] bytes, long blockSize) + throws IOException { + if (fs.exists(p)) { + fs.delete(p, true); + } + try (InputStream is = new ByteArrayInputStream(bytes); + FSDataOutputStream os = fs.create( + p, false, 4096, fs.getDefaultReplication(p), blockSize)) { + IOUtils.copyBytes(is, os, bytes.length); + } + } + /* Write the given string to the given file */ public static void writeFile(FileSystem fs, Path p, String s) throws IOException { @@ -901,14 +915,27 @@ public static void appendFile(FileSystem fs, Path p, int length) */ public static void appendFileNewBlock(DistributedFileSystem fs, Path p, int length) throws IOException { - assert fs.exists(p); assert length >= 0; byte[] toAppend = new byte[length]; Random random = new Random(); random.nextBytes(toAppend); + appendFileNewBlock(fs, p, toAppend); + } + + /** + * Append specified bytes to a given file, starting with new block. + * + * @param fs The file system + * @param p Path of the file to append + * @param bytes The data to append + * @throws IOException + */ + public static void appendFileNewBlock(DistributedFileSystem fs, + Path p, byte[] bytes) throws IOException { + assert fs.exists(p); try (FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) { - out.write(toAppend); + out.write(bytes); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java index d201ce199c..0ff2d4b3cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -30,7 +31,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -74,6 +77,9 @@ public class TestFileChecksum { private String stripedFile2 = ecDir + "/stripedFileChecksum2"; private String replicatedFile = "/replicatedFileChecksum"; + @Rule + public ExpectedException exception = ExpectedException.none(); + @Before public void setup() throws IOException { int numDNs = dataBlocks + parityBlocks + 2; @@ -83,6 +89,7 @@ public void setup() throws IOException { false); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + customizeConf(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); Path ecPath = new Path(ecDir); cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault()); @@ -106,6 +113,39 @@ public void tearDown() { } } + /** + * Subclasses may customize the conf to run the full set of tests under + * different conditions. + */ + protected void customizeConf(Configuration preparedConf) { + } + + /** + * Subclasses may override this method to indicate whether equivalent files + * in striped and replicated formats are expected to have the same + * overall FileChecksum. + */ + protected boolean expectComparableStripedAndReplicatedFiles() { + return false; + } + + /** + * Subclasses may override this method to indicate whether equivalent files + * in replicated formats with different block sizes are expected to have the + * same overall FileChecksum. + */ + protected boolean expectComparableDifferentBlockSizeReplicatedFiles() { + return false; + } + + /** + * Subclasses may override this method to indicate whether checksums are + * supported for files where different blocks have different bytesPerCRC. + */ + protected boolean expectSupportForSingleFileMixedBytesPerChecksum() { + return false; + } + @Test(timeout = 90000) public void testStripedFileChecksum1() throws Exception { int length = 0; @@ -182,7 +222,30 @@ public void testStripedAndReplicatedFileChecksum() throws Exception { FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, 10, false); - Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); + if (expectComparableStripedAndReplicatedFiles()) { + Assert.assertEquals(stripedFileChecksum1, replicatedFileChecksum); + } else { + Assert.assertNotEquals(stripedFileChecksum1, replicatedFileChecksum); + } + } + + @Test(timeout = 90000) + public void testDifferentBlockSizeReplicatedFileChecksum() throws Exception { + byte[] fileData = StripedFileTestUtil.generateBytes(fileSize); + String replicatedFile1 = "/replicatedFile1"; + String replicatedFile2 = "/replicatedFile2"; + DFSTestUtil.writeFile( + fs, new Path(replicatedFile1), fileData, blockSize); + DFSTestUtil.writeFile( + fs, new Path(replicatedFile2), fileData, blockSize / 2); + FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false); + FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false); + + if (expectComparableDifferentBlockSizeReplicatedFiles()) { + Assert.assertEquals(checksum1, checksum2); + } else { + Assert.assertNotEquals(checksum1, checksum2); + } } @Test(timeout = 90000) @@ -471,6 +534,40 @@ public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20() bytesPerCRC - 1); } + @Test(timeout = 90000) + public void testMixedBytesPerChecksum() throws Exception { + int fileLength = bytesPerCRC * 3; + byte[] fileData = StripedFileTestUtil.generateBytes(fileLength); + String replicatedFile1 = "/replicatedFile1"; + + // Split file into two parts. + byte[] fileDataPart1 = new byte[bytesPerCRC * 2]; + System.arraycopy(fileData, 0, fileDataPart1, 0, fileDataPart1.length); + byte[] fileDataPart2 = new byte[fileData.length - fileDataPart1.length]; + System.arraycopy( + fileData, fileDataPart1.length, fileDataPart2, 0, fileDataPart2.length); + + DFSTestUtil.writeFile(fs, new Path(replicatedFile1), fileDataPart1); + + // Modify bytesPerCRC for second part that we append as separate block. + conf.setInt( + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesPerCRC / 2); + DFSTestUtil.appendFileNewBlock( + ((DistributedFileSystem) FileSystem.newInstance(conf)), + new Path(replicatedFile1), fileDataPart2); + + if (expectSupportForSingleFileMixedBytesPerChecksum()) { + String replicatedFile2 = "/replicatedFile2"; + DFSTestUtil.writeFile(fs, new Path(replicatedFile2), fileData); + FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false); + FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false); + Assert.assertEquals(checksum1, checksum2); + } else { + exception.expect(IOException.class); + FileChecksum checksum = getFileChecksum(replicatedFile1, -1, false); + } + } + private FileChecksum getFileChecksum(String filePath, int range, boolean killDn) throws Exception { int dnIdxToDie = -1; @@ -537,4 +634,4 @@ int getDataNodeToKill(String filePath) throws IOException { return -1; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java new file mode 100644 index 0000000000..87fb7da6e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +/** + * End-to-end tests for COMPOSITE_CRC combine mode. + */ +public class TestFileChecksumCompositeCrc extends TestFileChecksum { + @Override + protected void customizeConf(Configuration conf) { + conf.set( + HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC"); + } + + @Override + protected boolean expectComparableStripedAndReplicatedFiles() { + return true; + } + + @Override + protected boolean expectComparableDifferentBlockSizeReplicatedFiles() { + return true; + } + + @Override + protected boolean expectSupportForSingleFileMixedBytesPerChecksum() { + return true; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index da56c15397..22f84c5395 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -682,6 +683,19 @@ public void testChecksumTypeProto() { HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C); } + @Test + public void testBlockChecksumTypeProto() { + assertEquals(BlockChecksumType.MD5CRC, + PBHelperClient.convert(HdfsProtos.BlockChecksumTypeProto.MD5CRC)); + assertEquals(BlockChecksumType.COMPOSITE_CRC, + PBHelperClient.convert( + HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC)); + assertEquals(PBHelperClient.convert(BlockChecksumType.MD5CRC), + HdfsProtos.BlockChecksumTypeProto.MD5CRC); + assertEquals(PBHelperClient.convert(BlockChecksumType.COMPOSITE_CRC), + HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC); + } + @Test public void testAclEntryProto() { // All fields populated. diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index bbe7e8d925..30e4683684 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -73,20 +73,42 @@ public class TestCopyMapper { private static final String SOURCE_PATH = "/tmp/source"; private static final String TARGET_PATH = "/tmp/target"; - private static Configuration configuration; - @BeforeClass public static void setup() throws Exception { - configuration = getConfigurationForCluster(); - cluster = new MiniDFSCluster.Builder(configuration) + Configuration configuration = getConfigurationForCluster(); + setCluster(new MiniDFSCluster.Builder(configuration) .numDataNodes(1) .format(true) - .build(); + .build()); } - private static Configuration getConfigurationForCluster() throws IOException { + /** + * Subclasses may override this method to indicate whether copying files with + * non-default block sizes without setting BLOCKSIZE as a preserved attribute + * is expected to succeed with CRC checks enabled. + */ + protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() { + return false; + } + + /** + * Subclasses may override this method to indicate whether copying files with + * non-default bytes-per-crc without setting CHECKSUMTYPE as a preserved + * attribute is expected to succeed with CRC checks enabled. + */ + protected boolean expectDifferentBytesPerCrcToSucceed() { + return false; + } + + protected static void setCluster(MiniDFSCluster c) { + cluster = c; + } + + protected static Configuration getConfigurationForCluster() + throws IOException { Configuration configuration = new Configuration(); - System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data"); + System.setProperty( + "test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data"); configuration.set("hadoop.log.dir", "target/tmp"); configuration.set("dfs.namenode.fs-limits.min-block-size", "0"); LOG.debug("fs.default.name == " + configuration.get("fs.default.name")); @@ -136,7 +158,8 @@ private static void appendSourceData() throws Exception { } } - private static void createSourceDataWithDifferentBlockSize() throws Exception { + private static void createSourceDataWithDifferentBlockSize() + throws Exception { mkdirs(SOURCE_PATH + "/1"); mkdirs(SOURCE_PATH + "/2"); mkdirs(SOURCE_PATH + "/2/3/4"); @@ -163,6 +186,21 @@ private static void createSourceDataWithDifferentChecksumType() 512)); } + private static void createSourceDataWithDifferentBytesPerCrc() + throws Exception { + mkdirs(SOURCE_PATH + "/1"); + mkdirs(SOURCE_PATH + "/2"); + mkdirs(SOURCE_PATH + "/2/3/4"); + mkdirs(SOURCE_PATH + "/2/3"); + mkdirs(SOURCE_PATH + "/5"); + touchFile(SOURCE_PATH + "/5/6", false, + new ChecksumOpt(DataChecksum.Type.CRC32C, 32)); + mkdirs(SOURCE_PATH + "/7"); + mkdirs(SOURCE_PATH + "/7/8"); + touchFile(SOURCE_PATH + "/7/8/9", false, + new ChecksumOpt(DataChecksum.Type.CRC32C, 64)); + } + private static void mkdirs(String path) throws Exception { FileSystem fileSystem = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), @@ -281,7 +319,7 @@ public void testCopyWithAppend() throws Exception { path)), context); } - verifyCopy(fs, false); + verifyCopy(fs, false, true); // verify that we only copied new appended data Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) @@ -317,6 +355,11 @@ private void testCopy(boolean preserveChecksum) throws Exception { EnumSet fileAttributes = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION); if (preserveChecksum) { + // We created source files with both different checksum types and + // non-default block sizes; here we don't explicitly add BLOCKSIZE + // as a preserved attribute, but the current behavior is that + // preserving CHECKSUMTYPE also automatically implies preserving + // BLOCKSIZE. fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE); } configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), @@ -339,7 +382,7 @@ private void testCopy(boolean preserveChecksum) throws Exception { } // Check that the maps worked. - verifyCopy(fs, preserveChecksum); + verifyCopy(fs, preserveChecksum, true); Assert.assertEquals(numFiles, stubContext.getReporter() .getCounter(CopyMapper.Counter.COPY).getValue()); Assert.assertEquals(numDirs, stubContext.getReporter() @@ -361,7 +404,8 @@ private void testCopy(boolean preserveChecksum) throws Exception { } } - private void verifyCopy(FileSystem fs, boolean preserveChecksum) + private void verifyCopy( + FileSystem fs, boolean preserveChecksum, boolean preserveReplication) throws Exception { for (Path path : pathList) { final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH, @@ -370,8 +414,10 @@ private void verifyCopy(FileSystem fs, boolean preserveChecksum) Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); FileStatus sourceStatus = fs.getFileStatus(path); FileStatus targetStatus = fs.getFileStatus(targetPath); - Assert.assertEquals(sourceStatus.getReplication(), - targetStatus.getReplication()); + if (preserveReplication) { + Assert.assertEquals(sourceStatus.getReplication(), + targetStatus.getReplication()); + } if (preserveChecksum) { Assert.assertEquals(sourceStatus.getBlockSize(), targetStatus.getBlockSize()); @@ -505,7 +551,7 @@ public Mapper.Context run() { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -574,7 +620,7 @@ public Mapper.Context run() { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -649,7 +695,7 @@ public StubContext run() { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -730,7 +776,7 @@ public StubContext run() { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -887,7 +933,7 @@ private void doTestIgnoreFailuresDoubleWrapped(final boolean ignoreFailures) { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered when get FileSystem.", e); throw new RuntimeException(e); @@ -938,12 +984,13 @@ public void testPreserveBlockSizeAndReplication() { } @Test(timeout=40000) - public void testCopyFailOnBlockSizeDifference() throws Exception { + public void testCopyWithDifferentBlockSizes() throws Exception { try { deleteState(); createSourceDataWithDifferentBlockSize(); FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); StubContext stubContext = new StubContext(getConfiguration(), null, 0); Mapper.Context context @@ -959,17 +1006,79 @@ public void testCopyFailOnBlockSizeDifference() throws Exception { for (Path path : pathList) { final FileStatus fileStatus = fs.getFileStatus(path); - copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), - path)), new CopyListingFileStatus(fileStatus), context); + copyMapper.map( + new Text( + DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(fileStatus), context); } - Assert.fail("Copy should have failed because of block-size difference."); + if (expectDifferentBlockSizesMultipleBlocksToSucceed()) { + verifyCopy(fs, false, false); + } else { + Assert.fail( + "Copy should have failed because of block-size difference."); + } + } catch (Exception exception) { + if (expectDifferentBlockSizesMultipleBlocksToSucceed()) { + throw exception; + } else { + // Check that the exception suggests the use of -pb/-skipcrccheck. + // This could be refactored to use LambdaTestUtils if we add support + // for listing multiple different independent substrings to expect + // in the exception message and add support for LambdaTestUtils to + // inspect the transitive cause and/or suppressed exceptions as well. + Throwable cause = exception.getCause().getCause(); + GenericTestUtils.assertExceptionContains("-pb", cause); + GenericTestUtils.assertExceptionContains("-skipcrccheck", cause); + } } - catch (IOException exception) { - // Check that the exception suggests the use of -pb/-skipcrccheck. - Throwable cause = exception.getCause().getCause(); - GenericTestUtils.assertExceptionContains("-pb", cause); - GenericTestUtils.assertExceptionContains("-skipcrccheck", cause); + } + + @Test(timeout=40000) + public void testCopyWithDifferentBytesPerCrc() throws Exception { + try { + deleteState(); + createSourceDataWithDifferentBytesPerCrc(); + + FileSystem fs = cluster.getFileSystem(); + + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper.Context context + = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet fileAttributes + = EnumSet.noneOf(DistCpOptions.FileAttribute.class); + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + + copyMapper.setup(context); + + for (Path path : pathList) { + final FileStatus fileStatus = fs.getFileStatus(path); + copyMapper.map( + new Text( + DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(fileStatus), context); + } + + if (expectDifferentBytesPerCrcToSucceed()) { + verifyCopy(fs, false, false); + } else { + Assert.fail( + "Copy should have failed because of bytes-per-crc difference."); + } + } catch (Exception exception) { + if (expectDifferentBytesPerCrcToSucceed()) { + throw exception; + } else { + // This could be refactored to use LambdaTestUtils if we add support + // for LambdaTestUtils to inspect the transitive cause and/or + // suppressed exceptions as well. + Throwable cause = exception.getCause().getCause(); + GenericTestUtils.assertExceptionContains("mismatch", cause); + } } } @@ -980,6 +1089,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ createSourceData(); FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); StubContext stubContext = new StubContext(getConfiguration(), null, 0); Mapper.Context context @@ -1010,6 +1120,12 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ final FileStatus source = fs.getFileStatus(path); final FileStatus target = fs.getFileStatus(targetPath); if (!source.isDirectory() ) { + // The reason the checksum check succeeds despite block sizes not + // matching between the two is that when only one block is ever + // written (partial or complete), the crcPerBlock is not included + // in the FileChecksum algorithmName. If we had instead written + // a large enough file to exceed the blocksize, then the copy + // would not have succeeded. Assert.assertTrue(preserve || source.getBlockSize() != target.getBlockSize()); Assert.assertTrue(preserve || @@ -1020,8 +1136,7 @@ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ source.getReplication() == target.getReplication()); } } - } - catch (Exception e) { + } catch (Exception e) { Assert.assertTrue("Unexpected exception: " + e.getMessage(), false); e.printStackTrace(); } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java new file mode 100644 index 0000000000..6ed86e385d --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +import org.junit.BeforeClass; + +/** + * End-to-end tests for COMPOSITE_CRC combine mode. + */ +public class TestCopyMapperCompositeCrc extends TestCopyMapper { + @BeforeClass + public static void setup() throws Exception { + Configuration configuration = TestCopyMapper.getConfigurationForCluster(); + configuration.set( + HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC"); + TestCopyMapper.setCluster(new MiniDFSCluster.Builder(configuration) + .numDataNodes(1) + .format(true) + .build()); + } + + @Override + protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() { + return true; + } + + @Override + protected boolean expectDifferentBytesPerCrcToSucceed() { + return true; + } +}