HDFS-2130. Switch default checksum to CRC32C. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1196889 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
201a18de0c
commit
f84552ac35
@ -875,6 +875,8 @@ Release 0.23.0 - 2011-11-01
|
||||
|
||||
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
|
||||
|
||||
HDFS-2130. Switch default checksum to CRC32C. (todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-2347. Fix checkpointTxnCount's comment about editlog size.
|
||||
|
@ -98,6 +98,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
/********************************************************
|
||||
@ -139,6 +140,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
final int maxBlockAcquireFailures;
|
||||
final int confTime;
|
||||
final int ioBufferSize;
|
||||
final int checksumType;
|
||||
final int bytesPerChecksum;
|
||||
final int writePacketSize;
|
||||
final int socketTimeout;
|
||||
@ -163,6 +165,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
ioBufferSize = conf.getInt(
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
checksumType = getChecksumType(conf);
|
||||
bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
|
||||
DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
||||
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
||||
@ -190,6 +193,26 @@ public class DFSClient implements java.io.Closeable {
|
||||
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
||||
uMask = FsPermission.getUMask(conf);
|
||||
}
|
||||
|
||||
private int getChecksumType(Configuration conf) {
|
||||
String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
|
||||
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
||||
if ("CRC32".equals(checksum)) {
|
||||
return DataChecksum.CHECKSUM_CRC32;
|
||||
} else if ("CRC32C".equals(checksum)) {
|
||||
return DataChecksum.CHECKSUM_CRC32C;
|
||||
} else if ("NULL".equals(checksum)) {
|
||||
return DataChecksum.CHECKSUM_NULL;
|
||||
} else {
|
||||
LOG.warn("Bad checksum type: " + checksum + ". Using default.");
|
||||
return DataChecksum.CHECKSUM_CRC32C;
|
||||
}
|
||||
}
|
||||
|
||||
private DataChecksum createChecksum() {
|
||||
return DataChecksum.newDataChecksum(
|
||||
checksumType, bytesPerChecksum);
|
||||
}
|
||||
}
|
||||
|
||||
Conf getConf() {
|
||||
@ -773,7 +796,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
}
|
||||
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
|
||||
createParent, replication, blockSize, progress, buffersize,
|
||||
dfsClientConf.bytesPerChecksum);
|
||||
dfsClientConf.createChecksum());
|
||||
leaserenewer.put(src, result, this);
|
||||
return result;
|
||||
}
|
||||
@ -817,9 +840,12 @@ public class DFSClient implements java.io.Closeable {
|
||||
CreateFlag.validate(flag);
|
||||
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
|
||||
if (result == null) {
|
||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
||||
dfsClientConf.checksumType,
|
||||
bytesPerChecksum);
|
||||
result = new DFSOutputStream(this, src, absPermission,
|
||||
flag, createParent, replication, blockSize, progress, buffersize,
|
||||
bytesPerChecksum);
|
||||
checksum);
|
||||
}
|
||||
leaserenewer.put(src, result, this);
|
||||
return result;
|
||||
@ -877,7 +903,7 @@ public class DFSClient implements java.io.Closeable {
|
||||
UnresolvedPathException.class);
|
||||
}
|
||||
return new DFSOutputStream(this, src, buffersize, progress,
|
||||
lastBlock, stat, dfsClientConf.bytesPerChecksum);
|
||||
lastBlock, stat, dfsClientConf.createChecksum());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -38,6 +38,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
|
||||
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
|
||||
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
|
||||
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
|
||||
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
|
||||
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
|
||||
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
|
||||
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
|
||||
|
@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.PureJavaCrc32;
|
||||
|
||||
|
||||
/****************************************************************
|
||||
@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||
}
|
||||
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
|
||||
int bytesPerChecksum, short replication) throws IOException {
|
||||
super(new PureJavaCrc32(), bytesPerChecksum, 4);
|
||||
DataChecksum checksum, short replication) throws IOException {
|
||||
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
|
||||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
this.dfsClient = dfsClient;
|
||||
this.src = src;
|
||||
this.blockSize = blockSize;
|
||||
@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||
"multiple of io.bytes.per.checksum");
|
||||
|
||||
}
|
||||
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
|
||||
bytesPerChecksum);
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||
*/
|
||||
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize, Progressable progress,
|
||||
int buffersize, int bytesPerChecksum)
|
||||
int buffersize, DataChecksum checksum)
|
||||
throws IOException {
|
||||
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
|
||||
this(dfsClient, src, blockSize, progress, checksum, replication);
|
||||
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
|
||||
try {
|
||||
dfsClient.namenode.create(
|
||||
@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||
*/
|
||||
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
|
||||
LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
int bytesPerChecksum) throws IOException {
|
||||
this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
|
||||
DataChecksum checksum) throws IOException {
|
||||
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
|
||||
//
|
||||
@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||
if (lastBlock != null) {
|
||||
// indicate that we are appending to an existing block
|
||||
bytesCurBlock = lastBlock.getBlockSize();
|
||||
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
|
||||
streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
|
||||
} else {
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
streamer = new DataStreamer();
|
||||
}
|
||||
streamer.start();
|
||||
|
@ -18,12 +18,15 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.commons.httpclient.methods.GetMethod;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
@ -88,6 +91,18 @@ class BlockMetadataHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the header at the beginning of the given block meta file.
|
||||
* The current file position will be altered by this method.
|
||||
* If an error occurs, the file is <em>not</em> closed.
|
||||
*/
|
||||
static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
|
||||
byte[] buf = new byte[getHeaderSize()];
|
||||
raf.seek(0);
|
||||
raf.readFully(buf, 0, buf.length);
|
||||
return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
|
||||
}
|
||||
|
||||
// Version is already read.
|
||||
private static BlockMetadataHeader readHeader(short version, DataInputStream in)
|
||||
throws IOException {
|
||||
|
@ -63,7 +63,15 @@ class BlockReceiver implements Closeable {
|
||||
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
|
||||
|
||||
private DataInputStream in = null; // from where data are read
|
||||
private DataChecksum checksum; // from where chunks of a block can be read
|
||||
private DataChecksum clientChecksum; // checksum used by client
|
||||
private DataChecksum diskChecksum; // checksum we write to disk
|
||||
|
||||
/**
|
||||
* In the case that the client is writing with a different
|
||||
* checksum polynomial than the block is stored with on disk,
|
||||
* the DataNode needs to recalculate checksums before writing.
|
||||
*/
|
||||
private boolean needsChecksumTranslation;
|
||||
private OutputStream out = null; // to block file at local disk
|
||||
private FileDescriptor outFd;
|
||||
private OutputStream cout = null; // output stream for cehcksum file
|
||||
@ -177,33 +185,35 @@ class BlockReceiver implements Closeable {
|
||||
" while receiving block " + block + " from " + inAddr);
|
||||
}
|
||||
}
|
||||
// read checksum meta information
|
||||
this.checksum = requestedChecksum;
|
||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
this.checksumSize = checksum.getChecksumSize();
|
||||
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
|
||||
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
|
||||
|
||||
final boolean isCreate = isDatanode || isTransfer
|
||||
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
streams = replicaInfo.createStreams(isCreate,
|
||||
this.bytesPerChecksum, this.checksumSize);
|
||||
if (streams != null) {
|
||||
this.out = streams.dataOut;
|
||||
if (out instanceof FileOutputStream) {
|
||||
this.outFd = ((FileOutputStream)out).getFD();
|
||||
} else {
|
||||
LOG.warn("Could not get file descriptor for outputstream of class " +
|
||||
out.getClass());
|
||||
}
|
||||
this.cout = streams.checksumOut;
|
||||
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
||||
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
// write data chunk header if creating a new replica
|
||||
if (isCreate) {
|
||||
BlockMetadataHeader.writeHeader(checksumOut, checksum);
|
||||
}
|
||||
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
|
||||
assert streams != null : "null streams!";
|
||||
|
||||
// read checksum meta information
|
||||
this.clientChecksum = requestedChecksum;
|
||||
this.diskChecksum = streams.getChecksum();
|
||||
this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
|
||||
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
|
||||
this.checksumSize = diskChecksum.getChecksumSize();
|
||||
|
||||
this.out = streams.dataOut;
|
||||
if (out instanceof FileOutputStream) {
|
||||
this.outFd = ((FileOutputStream)out).getFD();
|
||||
} else {
|
||||
LOG.warn("Could not get file descriptor for outputstream of class " +
|
||||
out.getClass());
|
||||
}
|
||||
this.cout = streams.checksumOut;
|
||||
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
||||
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
// write data chunk header if creating a new replica
|
||||
if (isCreate) {
|
||||
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
|
||||
}
|
||||
} catch (ReplicaAlreadyExistsException bae) {
|
||||
throw bae;
|
||||
} catch (ReplicaNotFoundException bne) {
|
||||
@ -315,9 +325,9 @@ class BlockReceiver implements Closeable {
|
||||
while (len > 0) {
|
||||
int chunkLen = Math.min(len, bytesPerChecksum);
|
||||
|
||||
checksum.update(dataBuf, dataOff, chunkLen);
|
||||
clientChecksum.update(dataBuf, dataOff, chunkLen);
|
||||
|
||||
if (!checksum.compare(checksumBuf, checksumOff)) {
|
||||
if (!clientChecksum.compare(checksumBuf, checksumOff)) {
|
||||
if (srcDataNode != null) {
|
||||
try {
|
||||
LOG.info("report corrupt block " + block + " from datanode " +
|
||||
@ -334,12 +344,32 @@ class BlockReceiver implements Closeable {
|
||||
"while writing " + block + " from " + inAddr);
|
||||
}
|
||||
|
||||
checksum.reset();
|
||||
clientChecksum.reset();
|
||||
dataOff += chunkLen;
|
||||
checksumOff += checksumSize;
|
||||
len -= chunkLen;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Translate CRC chunks from the client's checksum implementation
|
||||
* to the disk checksum implementation.
|
||||
*
|
||||
* This does not verify the original checksums, under the assumption
|
||||
* that they have already been validated.
|
||||
*/
|
||||
private void translateChunks( byte[] dataBuf, int dataOff, int len,
|
||||
byte[] checksumBuf, int checksumOff )
|
||||
throws IOException {
|
||||
if (len == 0) return;
|
||||
|
||||
int numChunks = (len - 1)/bytesPerChecksum + 1;
|
||||
|
||||
diskChecksum.calculateChunkedSums(
|
||||
ByteBuffer.wrap(dataBuf, dataOff, len),
|
||||
ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes sure buf.position() is zero without modifying buf.remaining().
|
||||
@ -583,9 +613,16 @@ class BlockReceiver implements Closeable {
|
||||
* protocol includes acks and only the last datanode needs to verify
|
||||
* checksum.
|
||||
*/
|
||||
if (mirrorOut == null || isDatanode) {
|
||||
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
|
||||
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
|
||||
if (needsChecksumTranslation) {
|
||||
// overwrite the checksums in the packet buffer with the
|
||||
// appropriate polynomial for the disk storage.
|
||||
translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
|
||||
}
|
||||
}
|
||||
|
||||
// by this point, the data in the buffer uses the disk checksum
|
||||
|
||||
byte[] lastChunkChecksum;
|
||||
|
||||
@ -807,7 +844,7 @@ class BlockReceiver implements Closeable {
|
||||
// find offset of the beginning of partial chunk.
|
||||
//
|
||||
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
|
||||
int checksumSize = checksum.getChecksumSize();
|
||||
int checksumSize = diskChecksum.getChecksumSize();
|
||||
blkoff = blkoff - sizePartialChunk;
|
||||
LOG.info("computePartialChunkCrc sizePartialChunk " +
|
||||
sizePartialChunk +
|
||||
@ -832,7 +869,8 @@ class BlockReceiver implements Closeable {
|
||||
}
|
||||
|
||||
// compute crc of partial chunk from data read in the block file.
|
||||
partialCrc = new PureJavaCrc32();
|
||||
partialCrc = DataChecksum.newDataChecksum(
|
||||
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
|
||||
partialCrc.update(buf, 0, sizePartialChunk);
|
||||
LOG.info("Read in partial CRC chunk from disk for block " + block);
|
||||
|
||||
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
/**
|
||||
@ -158,15 +159,23 @@ public interface FSDatasetInterface extends FSDatasetMBean {
|
||||
static class BlockWriteStreams {
|
||||
OutputStream dataOut;
|
||||
OutputStream checksumOut;
|
||||
BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
|
||||
DataChecksum checksum;
|
||||
|
||||
BlockWriteStreams(OutputStream dOut, OutputStream cOut,
|
||||
DataChecksum checksum) {
|
||||
dataOut = dOut;
|
||||
checksumOut = cOut;
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
void close() throws IOException {
|
||||
IOUtils.closeStream(dataOut);
|
||||
IOUtils.closeStream(checksumOut);
|
||||
}
|
||||
|
||||
DataChecksum getChecksum() {
|
||||
return checksum;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
|
||||
@Override // ReplicaInPipelineInterface
|
||||
public BlockWriteStreams createStreams(boolean isCreate,
|
||||
int bytesPerChunk, int checksumSize) throws IOException {
|
||||
DataChecksum requestedChecksum) throws IOException {
|
||||
File blockFile = getBlockFile();
|
||||
File metaFile = getMetaFile();
|
||||
if (DataNode.LOG.isDebugEnabled()) {
|
||||
@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
}
|
||||
long blockDiskSize = 0L;
|
||||
long crcDiskSize = 0L;
|
||||
if (!isCreate) { // check on disk file
|
||||
blockDiskSize = bytesOnDisk;
|
||||
crcDiskSize = BlockMetadataHeader.getHeaderSize() +
|
||||
(blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
|
||||
if (blockDiskSize>0 &&
|
||||
(blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
|
||||
throw new IOException("Corrupted block: " + this);
|
||||
|
||||
// the checksum that should actually be used -- this
|
||||
// may differ from requestedChecksum for appends.
|
||||
DataChecksum checksum;
|
||||
|
||||
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
|
||||
|
||||
if (!isCreate) {
|
||||
// For append or recovery, we must enforce the existing checksum.
|
||||
// Also, verify that the file has correct lengths, etc.
|
||||
boolean checkedMeta = false;
|
||||
try {
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
|
||||
checksum = header.getChecksum();
|
||||
|
||||
if (checksum.getBytesPerChecksum() !=
|
||||
requestedChecksum.getBytesPerChecksum()) {
|
||||
throw new IOException("Client requested checksum " +
|
||||
requestedChecksum + " when appending to an existing block " +
|
||||
"with different chunk size: " + checksum);
|
||||
}
|
||||
|
||||
int bytesPerChunk = checksum.getBytesPerChecksum();
|
||||
int checksumSize = checksum.getChecksumSize();
|
||||
|
||||
blockDiskSize = bytesOnDisk;
|
||||
crcDiskSize = BlockMetadataHeader.getHeaderSize() +
|
||||
(blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
|
||||
if (blockDiskSize>0 &&
|
||||
(blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
|
||||
throw new IOException("Corrupted block: " + this);
|
||||
}
|
||||
checkedMeta = true;
|
||||
} finally {
|
||||
if (!checkedMeta) {
|
||||
// clean up in case of exceptions.
|
||||
IOUtils.closeStream(metaRAF);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// for create, we can use the requested checksum
|
||||
checksum = requestedChecksum;
|
||||
}
|
||||
|
||||
FileOutputStream blockOut = null;
|
||||
FileOutputStream crcOut = null;
|
||||
try {
|
||||
blockOut = new FileOutputStream(
|
||||
new RandomAccessFile( blockFile, "rw" ).getFD() );
|
||||
crcOut = new FileOutputStream(
|
||||
new RandomAccessFile( metaFile, "rw" ).getFD() );
|
||||
crcOut = new FileOutputStream(metaRAF.getFD() );
|
||||
if (!isCreate) {
|
||||
blockOut.getChannel().position(blockDiskSize);
|
||||
crcOut.getChannel().position(crcDiskSize);
|
||||
}
|
||||
return new BlockWriteStreams(blockOut, crcOut);
|
||||
return new BlockWriteStreams(blockOut, crcOut, checksum);
|
||||
} catch (IOException e) {
|
||||
IOUtils.closeStream(blockOut);
|
||||
IOUtils.closeStream(crcOut);
|
||||
IOUtils.closeStream(metaRAF);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
* This defines the interface of a replica in Pipeline that's being written to
|
||||
@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface extends Replica {
|
||||
* one for block file and one for CRC file
|
||||
*
|
||||
* @param isCreate if it is for creation
|
||||
* @param bytePerChunk number of bytes per CRC chunk
|
||||
* @param checksumSize number of bytes per checksum
|
||||
* @param requestedChecksum the checksum the writer would prefer to use
|
||||
* @return output streams for writing
|
||||
* @throws IOException if any error occurs
|
||||
*/
|
||||
public BlockWriteStreams createStreams(boolean isCreate,
|
||||
int bytesPerChunk, int checksumSize) throws IOException;
|
||||
DataChecksum requestedChecksum) throws IOException;
|
||||
}
|
||||
|
@ -0,0 +1,165 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test cases for trying to append to a file with a different
|
||||
* checksum than the file was originally written with.
|
||||
*/
|
||||
public class TestAppendDifferentChecksum {
|
||||
private static final int SEGMENT_LENGTH = 1500;
|
||||
|
||||
// run the randomized test for 5 seconds
|
||||
private static final long RANDOM_TEST_RUNTIME = 5000;
|
||||
private static MiniDFSCluster cluster;
|
||||
private static FileSystem fs;
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||
|
||||
// disable block scanner, since otherwise this test can trigger
|
||||
// HDFS-2525, which is a different bug than we're trying to unit test
|
||||
// here! When HDFS-2525 is fixed, this can be removed.
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
|
||||
|
||||
conf.set("fs.hdfs.impl.disable.cache", "true");
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test does not run, since switching chunksize with append
|
||||
* is not implemented. Please see HDFS-2130 for a discussion of the
|
||||
* difficulties in doing so.
|
||||
*/
|
||||
@Test
|
||||
@Ignore("this is not implemented! See HDFS-2130")
|
||||
public void testSwitchChunkSize() throws IOException {
|
||||
FileSystem fsWithSmallChunk = createFsWithChecksum("CRC32", 512);
|
||||
FileSystem fsWithBigChunk = createFsWithChecksum("CRC32", 1024);
|
||||
Path p = new Path("/testSwitchChunkSize");
|
||||
appendWithTwoFs(p, fsWithSmallChunk, fsWithBigChunk);
|
||||
AppendTestUtil.check(fsWithSmallChunk, p, SEGMENT_LENGTH * 2);
|
||||
AppendTestUtil.check(fsWithBigChunk, p, SEGMENT_LENGTH * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple unit test which writes some data with one algorithm,
|
||||
* then appends with another.
|
||||
*/
|
||||
@Test
|
||||
public void testSwitchAlgorithms() throws IOException {
|
||||
FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
|
||||
FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
|
||||
|
||||
Path p = new Path("/testSwitchAlgorithms");
|
||||
appendWithTwoFs(p, fsWithCrc32, fsWithCrc32C);
|
||||
// Regardless of which FS is used to read, it should pick up
|
||||
// the on-disk checksum!
|
||||
AppendTestUtil.check(fsWithCrc32C, p, SEGMENT_LENGTH * 2);
|
||||
AppendTestUtil.check(fsWithCrc32, p, SEGMENT_LENGTH * 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test which randomly alternates between appending with
|
||||
* CRC32 and with CRC32C, crossing several block boundaries.
|
||||
* Then, checks that all of the data can be read back correct.
|
||||
*/
|
||||
@Test(timeout=RANDOM_TEST_RUNTIME*2)
|
||||
public void testAlgoSwitchRandomized() throws IOException {
|
||||
FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
|
||||
FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
|
||||
|
||||
Path p = new Path("/testAlgoSwitchRandomized");
|
||||
long seed = System.currentTimeMillis();
|
||||
System.out.println("seed: " + seed);
|
||||
Random r = new Random(seed);
|
||||
|
||||
// Create empty to start
|
||||
IOUtils.closeStream(fsWithCrc32.create(p));
|
||||
|
||||
long st = System.currentTimeMillis();
|
||||
int len = 0;
|
||||
while (System.currentTimeMillis() - st < RANDOM_TEST_RUNTIME) {
|
||||
int thisLen = r.nextInt(500);
|
||||
FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C);
|
||||
FSDataOutputStream stm = fs.append(p);
|
||||
try {
|
||||
AppendTestUtil.write(stm, len, thisLen);
|
||||
} finally {
|
||||
stm.close();
|
||||
}
|
||||
len += thisLen;
|
||||
}
|
||||
|
||||
AppendTestUtil.check(fsWithCrc32, p, len);
|
||||
AppendTestUtil.check(fsWithCrc32C, p, len);
|
||||
}
|
||||
|
||||
private FileSystem createFsWithChecksum(String type, int bytes)
|
||||
throws IOException {
|
||||
Configuration conf = new Configuration(fs.getConf());
|
||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, type);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytes);
|
||||
return FileSystem.get(conf);
|
||||
}
|
||||
|
||||
|
||||
private void appendWithTwoFs(Path p, FileSystem fs1, FileSystem fs2)
|
||||
throws IOException {
|
||||
FSDataOutputStream stm = fs1.create(p);
|
||||
try {
|
||||
AppendTestUtil.write(stm, 0, SEGMENT_LENGTH);
|
||||
} finally {
|
||||
stm.close();
|
||||
}
|
||||
|
||||
stm = fs2.append(p);
|
||||
try {
|
||||
AppendTestUtil.write(stm, SEGMENT_LENGTH, SEGMENT_LENGTH);
|
||||
} finally {
|
||||
stm.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -74,7 +74,7 @@ public class TestDataTransferProtocol extends TestCase {
|
||||
"org.apache.hadoop.hdfs.TestDataTransferProtocol");
|
||||
|
||||
private static final DataChecksum DEFAULT_CHECKSUM =
|
||||
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
|
||||
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32C, 512);
|
||||
|
||||
DatanodeID datanode;
|
||||
InetSocketAddress dnAddr;
|
||||
|
@ -204,13 +204,13 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
|
||||
|
||||
@Override
|
||||
synchronized public BlockWriteStreams createStreams(boolean isCreate,
|
||||
int bytesPerChunk, int checksumSize) throws IOException {
|
||||
DataChecksum requestedChecksum) throws IOException {
|
||||
if (finalized) {
|
||||
throw new IOException("Trying to write to a finalized replica "
|
||||
+ theBlock);
|
||||
} else {
|
||||
SimulatedOutputStream crcStream = new SimulatedOutputStream();
|
||||
return new BlockWriteStreams(oStream, crcStream);
|
||||
return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,8 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||
// we pass expected len as zero, - fsdataset should use the sizeof actual
|
||||
// data written
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
|
||||
BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
|
||||
BlockWriteStreams out = bInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
|
||||
try {
|
||||
OutputStream dataOut = out.dataOut;
|
||||
assertEquals(0, fsdataset.getLength(b));
|
||||
|
Loading…
x
Reference in New Issue
Block a user