HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
d207aba026
commit
915cbc91c0
@ -143,9 +143,12 @@ public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) {
|
||||
* Creates a DataChecksum from HEADER_LEN bytes from arr[offset].
|
||||
* @return DataChecksum of the type in the array or null in case of an error.
|
||||
*/
|
||||
public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
|
||||
public static DataChecksum newDataChecksum(byte[] bytes, int offset)
|
||||
throws IOException {
|
||||
if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
|
||||
return null;
|
||||
throw new InvalidChecksumSizeException("Could not create DataChecksum "
|
||||
+ " from the byte array of length " + bytes.length
|
||||
+ " and offset "+ offset);
|
||||
}
|
||||
|
||||
// like readInt():
|
||||
@ -153,7 +156,14 @@ public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
|
||||
( (bytes[offset+2] & 0xff) << 16 ) |
|
||||
( (bytes[offset+3] & 0xff) << 8 ) |
|
||||
( (bytes[offset+4] & 0xff) );
|
||||
return newDataChecksum( Type.valueOf(bytes[offset]), bytesPerChecksum );
|
||||
DataChecksum csum = newDataChecksum(mapByteToChecksumType(bytes[offset]),
|
||||
bytesPerChecksum);
|
||||
if (csum == null) {
|
||||
throw new InvalidChecksumSizeException(("Could not create DataChecksum "
|
||||
+ " from the byte array of length " + bytes.length
|
||||
+ " and bytesPerCheckSum of "+ bytesPerChecksum));
|
||||
}
|
||||
return csum;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -164,7 +174,7 @@ public static DataChecksum newDataChecksum( DataInputStream in )
|
||||
throws IOException {
|
||||
int type = in.readByte();
|
||||
int bpc = in.readInt();
|
||||
DataChecksum summer = newDataChecksum(Type.valueOf(type), bpc );
|
||||
DataChecksum summer = newDataChecksum(mapByteToChecksumType(type), bpc);
|
||||
if ( summer == null ) {
|
||||
throw new InvalidChecksumSizeException("Could not create DataChecksum "
|
||||
+ "of type " + type + " with bytesPerChecksum " + bpc);
|
||||
@ -172,6 +182,16 @@ public static DataChecksum newDataChecksum( DataInputStream in )
|
||||
return summer;
|
||||
}
|
||||
|
||||
private static Type mapByteToChecksumType(int type)
|
||||
throws InvalidChecksumSizeException{
|
||||
try {
|
||||
return Type.valueOf(type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new InvalidChecksumSizeException("The value "+type+" does not map"+
|
||||
" to a valid checksum Type");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the checksum header to the output stream <i>out</i>.
|
||||
*/
|
||||
|
@ -35,6 +35,7 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.util.InvalidChecksumSizeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -119,13 +120,19 @@ public static BlockMetadataHeader preadHeader(FileChannel fc)
|
||||
ByteBuffer buf = ByteBuffer.wrap(arr);
|
||||
|
||||
while (buf.hasRemaining()) {
|
||||
if (fc.read(buf, 0) <= 0) {
|
||||
throw new EOFException("unexpected EOF while reading " +
|
||||
"metadata file header");
|
||||
if (fc.read(buf, buf.position()) <= 0) {
|
||||
throw new CorruptMetaHeaderException("EOF while reading header from "+
|
||||
"the metadata file. The meta file may be truncated or corrupt");
|
||||
}
|
||||
}
|
||||
short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
|
||||
DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
|
||||
DataChecksum dataChecksum;
|
||||
try {
|
||||
dataChecksum = DataChecksum.newDataChecksum(arr, 2);
|
||||
} catch (InvalidChecksumSizeException e) {
|
||||
throw new CorruptMetaHeaderException("The block meta file header is "+
|
||||
"corrupt", e);
|
||||
}
|
||||
return new BlockMetadataHeader(version, dataChecksum);
|
||||
}
|
||||
|
||||
@ -136,7 +143,14 @@ public static BlockMetadataHeader preadHeader(FileChannel fc)
|
||||
*/
|
||||
public static BlockMetadataHeader readHeader(DataInputStream in)
|
||||
throws IOException {
|
||||
try {
|
||||
return readHeader(in.readShort(), in);
|
||||
} catch (EOFException eof) {
|
||||
// The attempt to read the header threw EOF, indicating there are not
|
||||
// enough bytes in the meta file for the header.
|
||||
throw new CorruptMetaHeaderException("EOF while reading header from meta"+
|
||||
". The meta file may be truncated or corrupt", eof);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -170,7 +184,13 @@ public static BlockMetadataHeader readHeader(RandomAccessFile raf)
|
||||
// Version is already read.
|
||||
private static BlockMetadataHeader readHeader(short version,
|
||||
DataInputStream in) throws IOException {
|
||||
DataChecksum checksum = DataChecksum.newDataChecksum(in);
|
||||
DataChecksum checksum = null;
|
||||
try {
|
||||
checksum = DataChecksum.newDataChecksum(in);
|
||||
} catch (InvalidChecksumSizeException e) {
|
||||
throw new CorruptMetaHeaderException("The block meta file header is "+
|
||||
"corrupt", e);
|
||||
}
|
||||
return new BlockMetadataHeader(version, checksum);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Exception object that is thrown when the block metadata file is corrupt.
|
||||
*/
|
||||
public class CorruptMetaHeaderException extends IOException {
|
||||
|
||||
CorruptMetaHeaderException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
CorruptMetaHeaderException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
|
||||
}
|
@ -326,13 +326,22 @@ class BlockSender implements java.io.Closeable {
|
||||
// storage. The header is important for determining the checksum
|
||||
// type later when lazy persistence copies the block to non-transient
|
||||
// storage and computes the checksum.
|
||||
int expectedHeaderSize = BlockMetadataHeader.getHeaderSize();
|
||||
if (!replica.isOnTransientStorage() &&
|
||||
metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) {
|
||||
metaIn.getLength() >= expectedHeaderSize) {
|
||||
checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metaIn, IO_FILE_BUFFER_SIZE));
|
||||
|
||||
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
|
||||
keepMetaInOpen = true;
|
||||
} else if (!replica.isOnTransientStorage() &&
|
||||
metaIn.getLength() < expectedHeaderSize) {
|
||||
LOG.warn("The meta file length {} is less than the expected " +
|
||||
"header length {}, indicating the meta file is corrupt",
|
||||
metaIn.getLength(), expectedHeaderSize);
|
||||
throw new CorruptMetaHeaderException("The meta file length "+
|
||||
metaIn.getLength()+" is less than the expected length "+
|
||||
expectedHeaderSize);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Could not find metadata file for " + block);
|
||||
|
@ -208,7 +208,6 @@
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.InvalidChecksumSizeException;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ServicePlugin;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -3474,7 +3473,7 @@ private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
|
||||
void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
|
||||
|
||||
boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
|
||||
|| e instanceof InvalidChecksumSizeException);
|
||||
|| e instanceof CorruptMetaHeaderException);
|
||||
|
||||
if (!isBadBlock) {
|
||||
return;
|
||||
|
@ -647,6 +647,12 @@ public void readBlock(final ExtendedBlock block,
|
||||
dnR, block, remoteAddress, ioe);
|
||||
incrDatanodeNetworkErrors();
|
||||
}
|
||||
// Normally the client reports a bad block to the NN. However if the
|
||||
// meta file is corrupt or an disk error occurs (EIO), then the client
|
||||
// never gets a chance to do validation, and hence will never report
|
||||
// the block as bad. For some classes of IO exception, the DN should
|
||||
// report the block as bad, via the handleBadBlock() method
|
||||
datanode.handleBadBlock(block, ioe, false);
|
||||
throw ioe;
|
||||
} finally {
|
||||
IOUtils.closeStream(blockSender);
|
||||
@ -1118,6 +1124,12 @@ public void copyBlock(final ExtendedBlock block,
|
||||
isOpSuccess = false;
|
||||
LOG.info("opCopyBlock {} received exception {}", block, ioe.toString());
|
||||
incrDatanodeNetworkErrors();
|
||||
// Normally the client reports a bad block to the NN. However if the
|
||||
// meta file is corrupt or an disk error occurs (EIO), then the client
|
||||
// never gets a chance to do validation, and hence will never report
|
||||
// the block as bad. For some classes of IO exception, the DN should
|
||||
// report the block as bad, via the handleBadBlock() method
|
||||
datanode.handleBadBlock(block, ioe, false);
|
||||
throw ioe;
|
||||
} finally {
|
||||
dataXceiverServer.balanceThrottler.release();
|
||||
|
@ -0,0 +1,165 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Tests to ensure that a block is not read successfully from a datanode
|
||||
* when it has a corrupt metadata file.
|
||||
*/
|
||||
public class TestCorruptMetadataFile {
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private MiniDFSCluster.Builder clusterBuilder;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new HdfsConfiguration();
|
||||
// Reduce block acquire retries as we only have 1 DN and it allows the
|
||||
// test to run faster
|
||||
conf.setInt(
|
||||
HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 1);
|
||||
clusterBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testReadBlockFailsWhenMetaIsCorrupt() throws Exception {
|
||||
cluster = clusterBuilder.build();
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||
Path filePath = new Path("test.dat");
|
||||
FSDataOutputStream out = fs.create(filePath, (short) 1);
|
||||
out.write(1);
|
||||
out.hflush();
|
||||
out.close();
|
||||
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
File metadataFile = cluster.getBlockMetadataFile(0, block);
|
||||
|
||||
// First ensure we can read the file OK
|
||||
FSDataInputStream in = fs.open(filePath);
|
||||
in.readByte();
|
||||
in.close();
|
||||
|
||||
// Now truncate the meta file, and ensure the data is not read OK
|
||||
RandomAccessFile raFile = new RandomAccessFile(metadataFile, "rw");
|
||||
raFile.setLength(0);
|
||||
|
||||
FSDataInputStream intrunc = fs.open(filePath);
|
||||
LambdaTestUtils.intercept(BlockMissingException.class,
|
||||
() -> intrunc.readByte());
|
||||
intrunc.close();
|
||||
|
||||
// Write 11 bytes to the file, but an invalid header
|
||||
raFile.write("12345678901".getBytes());
|
||||
assertEquals(11, raFile.length());
|
||||
|
||||
FSDataInputStream ininvalid = fs.open(filePath);
|
||||
LambdaTestUtils.intercept(BlockMissingException.class,
|
||||
() -> ininvalid.readByte());
|
||||
ininvalid.close();
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return cluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getCorruptBlocks() == 1;
|
||||
}
|
||||
}, 100, 5000);
|
||||
|
||||
raFile.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test create a sample block meta file and then attempts to load it
|
||||
* using BlockMetadataHeader to ensure it can load a valid file and that it
|
||||
* throws a CorruptMetaHeaderException when the header is invalid.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testBlockMetaDataHeaderPReadHandlesCorruptMetaFile()
|
||||
throws Exception {
|
||||
File testDir = GenericTestUtils.getTestDir();
|
||||
RandomAccessFile raFile = new RandomAccessFile(
|
||||
new File(testDir, "metafile"), "rw");
|
||||
|
||||
// Write a valid header into the file
|
||||
// Version
|
||||
raFile.writeShort((short)1);
|
||||
// Checksum type
|
||||
raFile.writeByte(1);
|
||||
// Bytes per checksum
|
||||
raFile.writeInt(512);
|
||||
// We should be able to get the header with no exceptions
|
||||
BlockMetadataHeader header =
|
||||
BlockMetadataHeader.preadHeader(raFile.getChannel());
|
||||
|
||||
// Now truncate the meta file to zero and ensure an exception is raised
|
||||
raFile.setLength(0);
|
||||
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
|
||||
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
|
||||
|
||||
// Now write a partial valid header to sure an exception is thrown
|
||||
// if the header cannot be fully read
|
||||
// Version
|
||||
raFile.writeShort((short)1);
|
||||
// Checksum type
|
||||
raFile.writeByte(1);
|
||||
|
||||
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
|
||||
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
|
||||
|
||||
// Finally write the expected 7 bytes, but invalid data
|
||||
raFile.setLength(0);
|
||||
raFile.write("1234567".getBytes());
|
||||
|
||||
LambdaTestUtils.intercept(CorruptMetaHeaderException.class,
|
||||
() -> BlockMetadataHeader.preadHeader(raFile.getChannel()));
|
||||
|
||||
raFile.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user