HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

This commit is contained in:
cnauroth 2014-10-27 09:38:30 -07:00
parent 0058eadbd3
commit 463aec1171
27 changed files with 1152 additions and 707 deletions

View File

@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
protected FSOutputSummer(DataChecksum sum) { protected FSOutputSummer(DataChecksum sum) {
this.sum = sum; this.sum = sum;
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS]; this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS]; this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0; this.count = 0;
} }
@ -188,7 +188,12 @@ public void flush() throws IOException {
protected synchronized int getBufferedDataSize() { protected synchronized int getBufferedDataSize() {
return count; return count;
} }
/** @return the size for a checksum. */
protected int getChecksumSize() {
return sum.getChecksumSize();
}
/** Generate checksums for the given data chunks and output chunks & checksums /** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream. * to the underlying output stream.
*/ */
@ -197,9 +202,8 @@ private void writeChecksumChunks(byte b[], int off, int len)
sum.calculateChunkedSums(b, off, len, checksum, 0); sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize(); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
sum.getChecksumSize());
} }
} }
@ -226,8 +230,7 @@ static byte[] int2byte(int integer, byte[] bytes) {
*/ */
protected synchronized void setChecksumBufSize(int size) { protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size]; this.buf = new byte[size];
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) * this.checksum = new byte[sum.getChecksumSize(size)];
sum.getChecksumSize()];
this.count = 0; this.count = 0;
} }

View File

@ -234,15 +234,14 @@ public byte value() {
* This is used in FileSystem and FileContext to specify checksum options. * This is used in FileSystem and FileContext to specify checksum options.
*/ */
public static class ChecksumOpt { public static class ChecksumOpt {
private final int crcBlockSize; private final DataChecksum.Type checksumType;
private final DataChecksum.Type crcType; private final int bytesPerChecksum;
/** /**
* Create a uninitialized one * Create a uninitialized one
*/ */
public ChecksumOpt() { public ChecksumOpt() {
crcBlockSize = -1; this(DataChecksum.Type.DEFAULT, -1);
crcType = DataChecksum.Type.DEFAULT;
} }
/** /**
@ -251,16 +250,21 @@ public ChecksumOpt() {
* @param size bytes per checksum * @param size bytes per checksum
*/ */
public ChecksumOpt(DataChecksum.Type type, int size) { public ChecksumOpt(DataChecksum.Type type, int size) {
crcBlockSize = size; checksumType = type;
crcType = type; bytesPerChecksum = size;
} }
public int getBytesPerChecksum() { public int getBytesPerChecksum() {
return crcBlockSize; return bytesPerChecksum;
} }
public DataChecksum.Type getChecksumType() { public DataChecksum.Type getChecksumType() {
return crcType; return checksumType;
}
@Override
public String toString() {
return checksumType + ":" + bytesPerChecksum;
} }
/** /**

View File

@ -869,7 +869,8 @@ private static native void link0(String src, String dst)
* @throws IOException * @throws IOException
*/ */
public static void copyFileUnbuffered(File src, File dst) throws IOException { public static void copyFileUnbuffered(File src, File dst) throws IOException {
if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) { if ((nativeLoaded) &&
(Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
} else { } else {
FileUtils.copyFile(src, dst); FileUtils.copyFile(src, dst);

View File

@ -37,9 +37,6 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class DataChecksum implements Checksum { public class DataChecksum implements Checksum {
// Misc constants
public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
// checksum types // checksum types
public static final int CHECKSUM_NULL = 0; public static final int CHECKSUM_NULL = 0;
public static final int CHECKSUM_CRC32 = 1; public static final int CHECKSUM_CRC32 = 1;
@ -103,7 +100,7 @@ public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) {
* @return DataChecksum of the type in the array or null in case of an error. * @return DataChecksum of the type in the array or null in case of an error.
*/ */
public static DataChecksum newDataChecksum( byte bytes[], int offset ) { public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
if ( offset < 0 || bytes.length < offset + HEADER_LEN ) { if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
return null; return null;
} }
@ -116,8 +113,8 @@ public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
} }
/** /**
* This constructucts a DataChecksum by reading HEADER_LEN bytes from * This constructs a DataChecksum by reading HEADER_LEN bytes from input
* input stream <i>in</i> * stream <i>in</i>
*/ */
public static DataChecksum newDataChecksum( DataInputStream in ) public static DataChecksum newDataChecksum( DataInputStream in )
throws IOException { throws IOException {
@ -141,7 +138,7 @@ public void writeHeader( DataOutputStream out )
} }
public byte[] getHeader() { public byte[] getHeader() {
byte[] header = new byte[DataChecksum.HEADER_LEN]; byte[] header = new byte[getChecksumHeaderSize()];
header[0] = (byte) (type.id & 0xff); header[0] = (byte) (type.id & 0xff);
// Writing in buffer just like DataOutput.WriteInt() // Writing in buffer just like DataOutput.WriteInt()
header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff); header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
@ -229,13 +226,18 @@ private DataChecksum( Type type, Checksum checksum, int chunkSize ) {
bytesPerChecksum = chunkSize; bytesPerChecksum = chunkSize;
} }
// Accessors /** @return the checksum algorithm type. */
public Type getChecksumType() { public Type getChecksumType() {
return type; return type;
} }
/** @return the size for a checksum. */
public int getChecksumSize() { public int getChecksumSize() {
return type.size; return type.size;
} }
/** @return the required checksum size given the data length. */
public int getChecksumSize(int dataSize) {
return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
}
public int getBytesPerChecksum() { public int getBytesPerChecksum() {
return bytesPerChecksum; return bytesPerChecksum;
} }

View File

@ -377,6 +377,117 @@ public static final String getWinUtilsPath() {
return winUtilsPath; return winUtilsPath;
} }
public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
private final short major;
private final short minor;
private final short revision;
public LinuxKernelVersion(short major, short minor, short revision) {
this.major = major;
this.minor = minor;
this.revision = revision;
}
/**
* Parse Linux kernel version string from output of POSIX command 'uname -r'
* @param version version string from POSIX command 'uname -r'
* @return LinuxKernelVersion
* @throws IllegalArgumentException
*
* Note:
* On CentOS 5.8: '2.6.18-308.24.1.el5'
* On Ubuntu 14: '3.13.0-32-generic'
*/
public static LinuxKernelVersion parseLinuxKernelVersion(String version)
throws IllegalArgumentException {
if (version == null) {
throw new IllegalArgumentException();
}
String parts[] = version.split("-")[0].split("\\.");
if (parts.length != 3) {
throw new IllegalArgumentException(version);
}
short major = Short.parseShort(parts[0]);
short minor = Short.parseShort(parts[1]);
short revision = Short.parseShort(parts[2]);
return new LinuxKernelVersion(major, minor, revision);
}
@Override
public int compareTo(LinuxKernelVersion o) {
if (this.major == o.major) {
if (this.minor == o.minor) {
return this.revision - o.revision;
} else {
return this.minor - o.minor;
}
} else {
return this.major - o.major;
}
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof LinuxKernelVersion)) {
return false;
}
return compareTo((LinuxKernelVersion) other) == 0;
}
@Override
public String toString() {
return String.format("%d.%d.%d", major, minor, revision);
}
@Override
public int hashCode(){
int hash = 41;
hash = (19 * hash) + major;
hash = (53 * hash) + minor;
hash = (29 * hash) + revision;
return hash;
}
}
/*
* sendfile() API between two file descriptors
* is only supported on Linux Kernel version 2.6.33+
* according to http://man7.org/linux/man-pages/man2/sendfile.2.html
*/
public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
private static LinuxKernelVersion minLkvSupportSendfile =
new LinuxKernelVersion((short)2, (short)6, (short)33);
private static boolean isLinuxSendfileSupported() {
if (!Shell.LINUX) {
return false;
}
ShellCommandExecutor shexec = null;
boolean sendfileSupported = false;
try {
String[] args = {"uname", "bash", "-r"};
shexec = new ShellCommandExecutor(args);
shexec.execute();
String version = shexec.getOutput();
LinuxKernelVersion lkv =
LinuxKernelVersion.parseLinuxKernelVersion(version);
if (lkv.compareTo(minLkvSupportSendfile) > 0) {
sendfileSupported = true;
}
} catch (Exception e) {
LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("uname exited with exit code "
+ (shexec != null ? shexec.getExitCode() : "(null executor)"));
}
}
return sendfileSupported;
}
public static final boolean isSetsidAvailable = isSetsidSupported(); public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() { private static boolean isSetsidSupported() {
if (Shell.WINDOWS) { if (Shell.WINDOWS) {

View File

@ -165,4 +165,24 @@ private void testInterval(long interval) throws IOException {
assertEquals(2, command.getRunCount()); assertEquals(2, command.getRunCount());
} }
} }
public void testLinuxKernelVersion() throws IOException {
Shell.LinuxKernelVersion v2_6_18 =
new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
Shell.LinuxKernelVersion v2_6_32 =
new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
}
public void testParseLinuxKernelVersion() throws Exception {
String centOs58Ver = new String("2.6.18-308.24.1.el5");
String ubuntu14Ver = new String("3.13.0-32-generic");
Shell.LinuxKernelVersion lkvCentOs58 =
Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
Shell.LinuxKernelVersion lkvUnbuntu14 =
Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
}
} }

View File

@ -1274,6 +1274,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7090. Use unbuffered writes when persisting in-memory replicas. HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
(Xiaoyu Yao via cnauroth) (Xiaoyu Yao via cnauroth)
HDFS-6934. Move checksum computation off the hot path when writing to RAM
disk. (cnauroth)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -109,6 +109,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/ */
private DatanodeInfo datanode; private DatanodeInfo datanode;
/**
* StorageType of replica on DataNode.
*/
private StorageType storageType;
/** /**
* If false, we won't try short-circuit local reads. * If false, we won't try short-circuit local reads.
*/ */
@ -201,6 +206,11 @@ public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
return this; return this;
} }
public BlockReaderFactory setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
public BlockReaderFactory setAllowShortCircuitLocalReads( public BlockReaderFactory setAllowShortCircuitLocalReads(
boolean allowShortCircuitLocalReads) { boolean allowShortCircuitLocalReads) {
this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
@ -353,7 +363,7 @@ private BlockReader getLegacyBlockReaderLocal() throws IOException {
try { try {
return BlockReaderLocalLegacy.newBlockReader(conf, return BlockReaderLocalLegacy.newBlockReader(conf,
userGroupInformation, configuration, fileName, block, token, userGroupInformation, configuration, fileName, block, token,
datanode, startOffset, length); datanode, startOffset, length, storageType);
} catch (RemoteException remoteException) { } catch (RemoteException remoteException) {
ioe = remoteException.unwrapRemoteException( ioe = remoteException.unwrapRemoteException(
InvalidToken.class, AccessControlException.class); InvalidToken.class, AccessControlException.class);
@ -415,6 +425,7 @@ private BlockReader getBlockReaderLocal() throws InvalidToken {
setShortCircuitReplica(info.getReplica()). setShortCircuitReplica(info.getReplica()).
setVerifyChecksum(verifyChecksum). setVerifyChecksum(verifyChecksum).
setCachingStrategy(cachingStrategy). setCachingStrategy(cachingStrategy).
setStorageType(storageType).
build(); build();
} }

View File

@ -69,6 +69,7 @@ public static class Builder {
private ShortCircuitReplica replica; private ShortCircuitReplica replica;
private long dataPos; private long dataPos;
private ExtendedBlock block; private ExtendedBlock block;
private StorageType storageType;
public Builder(Conf conf) { public Builder(Conf conf) {
this.maxReadahead = Integer.MAX_VALUE; this.maxReadahead = Integer.MAX_VALUE;
@ -109,6 +110,11 @@ public Builder setBlock(ExtendedBlock block) {
return this; return this;
} }
public Builder setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
public BlockReaderLocal build() { public BlockReaderLocal build() {
Preconditions.checkNotNull(replica); Preconditions.checkNotNull(replica);
return new BlockReaderLocal(this); return new BlockReaderLocal(this);
@ -212,6 +218,11 @@ public BlockReaderLocal build() {
*/ */
private ByteBuffer checksumBuf; private ByteBuffer checksumBuf;
/**
* StorageType of replica on DataNode.
*/
private StorageType storageType;
private BlockReaderLocal(Builder builder) { private BlockReaderLocal(Builder builder) {
this.replica = builder.replica; this.replica = builder.replica;
this.dataIn = replica.getDataStream().getChannel(); this.dataIn = replica.getDataStream().getChannel();
@ -240,6 +251,7 @@ private BlockReaderLocal(Builder builder) {
this.zeroReadaheadRequested = false; this.zeroReadaheadRequested = false;
} }
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
this.storageType = builder.storageType;
} }
private synchronized void createDataBufIfNeeded() { private synchronized void createDataBufIfNeeded() {
@ -333,8 +345,8 @@ private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.clear(); checksumBuf.clear();
checksumBuf.limit(checksumsNeeded * checksumSize); checksumBuf.limit(checksumsNeeded * checksumSize);
long checksumPos = long checksumPos = BlockMetadataHeader.getHeaderSize()
7 + ((startDataPos / bytesPerChecksum) * checksumSize); + ((startDataPos / bytesPerChecksum) * checksumSize);
while (checksumBuf.hasRemaining()) { while (checksumBuf.hasRemaining()) {
int nRead = checksumIn.read(checksumBuf, checksumPos); int nRead = checksumIn.read(checksumBuf, checksumPos);
if (nRead < 0) { if (nRead < 0) {
@ -359,7 +371,14 @@ private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
private boolean createNoChecksumContext() { private boolean createNoChecksumContext() {
if (verifyChecksum) { if (verifyChecksum) {
return replica.addNoChecksumAnchor(); if (storageType != null && storageType.isTransient()) {
// Checksums are not stored for replicas on transient storage. We do not
// anchor, because we do not intend for client activity to block eviction
// from transient storage on the DataNode side.
return true;
} else {
return replica.addNoChecksumAnchor();
}
} else { } else {
return true; return true;
} }
@ -367,7 +386,9 @@ private boolean createNoChecksumContext() {
private void releaseNoChecksumContext() { private void releaseNoChecksumContext() {
if (verifyChecksum) { if (verifyChecksum) {
replica.removeNoChecksumAnchor(); if (storageType == null || !storageType.isTransient()) {
replica.removeNoChecksumAnchor();
}
} }
} }

View File

@ -181,7 +181,8 @@ static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
UserGroupInformation userGroupInformation, UserGroupInformation userGroupInformation,
Configuration configuration, String file, ExtendedBlock blk, Configuration configuration, String file, ExtendedBlock blk,
Token<BlockTokenIdentifier> token, DatanodeInfo node, Token<BlockTokenIdentifier> token, DatanodeInfo node,
long startOffset, long length) throws IOException { long startOffset, long length, StorageType storageType)
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort()); .getIpcPort());
// check the cache first // check the cache first
@ -192,7 +193,7 @@ static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
} }
pathinfo = getBlockPathInfo(userGroupInformation, blk, node, pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
configuration, conf.socketTimeout, token, configuration, conf.socketTimeout, token,
conf.connectToDnViaHostname); conf.connectToDnViaHostname, storageType);
} }
// check to see if the file exists. It may so happen that the // check to see if the file exists. It may so happen that the
@ -204,7 +205,8 @@ static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
FileInputStream dataIn = null; FileInputStream dataIn = null;
FileInputStream checksumIn = null; FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null; BlockReaderLocalLegacy localBlockReader = null;
boolean skipChecksumCheck = conf.skipShortCircuitChecksums; boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
storageType.isTransient();
try { try {
// get a local file system // get a local file system
File blkfile = new File(pathinfo.getBlockPath()); File blkfile = new File(pathinfo.getBlockPath());
@ -221,15 +223,8 @@ static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
File metafile = new File(pathinfo.getMetaPath()); File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile); checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
BlockMetadataHeader header = BlockMetadataHeader new DataInputStream(checksumIn), blk);
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
long firstChunkOffset = startOffset long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum()); - (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
@ -270,8 +265,8 @@ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname) Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
throws IOException { StorageType storageType) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null; BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
@ -279,7 +274,15 @@ private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
try { try {
// make RPC to local datanode to find local pathnames of blocks // make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token); pathinfo = proxy.getBlockLocalPathInfo(blk, token);
if (pathinfo != null) { // We cannot cache the path information for a replica on transient storage.
// If the replica gets evicted, then it moves to a different path. Then,
// our next attempt to read from the cached path would fail to find the
// file. Additionally, the failure would cause us to disable legacy
// short-circuit read for all subsequent use in the ClientContext. Unlike
// the newer short-circuit read implementation, we have no communication
// channel for the DataNode to notify the client that the path has been
// invalidated. Therefore, our only option is to skip caching.
if (pathinfo != null && !storageType.isTransient()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo); LOG.debug("Cached location of block " + blk + " as " + pathinfo);
} }

View File

@ -97,6 +97,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CipherSuite;
@ -519,8 +520,7 @@ private DataChecksum createChecksum() throws IOException {
return createChecksum(null); return createChecksum(null);
} }
private DataChecksum createChecksum(ChecksumOpt userOpt) private DataChecksum createChecksum(ChecksumOpt userOpt) {
throws IOException {
// Fill in any missing field with the default. // Fill in any missing field with the default.
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt( ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
defaultChecksumOpt, userOpt); defaultChecksumOpt, userOpt);
@ -528,8 +528,9 @@ private DataChecksum createChecksum(ChecksumOpt userOpt)
myOpt.getChecksumType(), myOpt.getChecksumType(),
myOpt.getBytesPerChecksum()); myOpt.getBytesPerChecksum());
if (dataChecksum == null) { if (dataChecksum == null) {
throw new IOException("Invalid checksum type specified: " throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ myOpt.getChecksumType().name()); + userOpt + ", default=" + defaultChecksumOpt
+ ", effective=null");
} }
return dataChecksum; return dataChecksum;
} }

View File

@ -23,6 +23,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -570,6 +571,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
DNAddrPair retval = chooseDataNode(targetBlock, null); DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info; chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr; InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
try { try {
ExtendedBlock blk = targetBlock.getBlock(); ExtendedBlock blk = targetBlock.getBlock();
@ -578,6 +580,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
setInetSocketAddress(targetAddr). setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient). setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode). setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src). setFileName(src).
setBlock(blk). setBlock(blk).
setBlockToken(accessToken). setBlockToken(accessToken).
@ -885,12 +888,11 @@ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
private DNAddrPair chooseDataNode(LocatedBlock block, private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) { while (true) {
DatanodeInfo[] nodes = block.getLocations();
try { try {
return getBestNodeDNAddrPair(nodes, ignoredNodes); return getBestNodeDNAddrPair(block, ignoredNodes);
} catch (IOException ie) { } catch (IOException ie) {
String errMsg = String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes); deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) { if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo; String description = "Could not obtain block: " + blockInfo;
@ -899,7 +901,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
throw new BlockMissingException(src, description, throw new BlockMissingException(src, description,
block.getStartOffset()); block.getStartOffset());
} }
DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) { if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo); DFSClient.LOG.info("No node available for " + blockInfo);
} }
@ -933,22 +936,44 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
} }
/** /**
* Get the best node. * Get the best node from which to stream the data.
* @param nodes Nodes to choose from. * @param block LocatedBlock, containing nodes in priority order.
* @param ignoredNodes Do not chose nodes in this array (may be null) * @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node. * @return The DNAddrPair of the best node.
* @throws IOException * @throws IOException
*/ */
private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes, private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes); DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
StorageType storageType = null;
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
// index to get storage type.
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
if (chosenNode == null) {
throw new IOException("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes);
}
final String dnAddr = final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr); return new DNAddrPair(chosenNode, targetAddr, storageType);
} }
private static String getBestNodeDNAddrPairErrorString( private static String getBestNodeDNAddrPairErrorString(
@ -1039,6 +1064,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
} }
DatanodeInfo chosenNode = datanode.info; DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr; InetSocketAddress targetAddr = datanode.addr;
StorageType storageType = datanode.storageType;
BlockReader reader = null; BlockReader reader = null;
try { try {
@ -1049,6 +1075,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
setInetSocketAddress(targetAddr). setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient). setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode). setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src). setFileName(src).
setBlock(block.getBlock()). setBlock(block.getBlock()).
setBlockToken(blockToken). setBlockToken(blockToken).
@ -1174,7 +1201,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
// If no nodes to do hedged reads against, pass. // If no nodes to do hedged reads against, pass.
try { try {
try { try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); chosenNode = getBestNodeDNAddrPair(block, ignored);
} catch (IOException ioe) { } catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
} }
@ -1529,31 +1556,17 @@ public void reset() throws IOException {
throw new IOException("Mark/reset not supported"); throw new IOException("Mark/reset not supported");
} }
/**
* Pick the best node from which to stream the data.
* Entries in <i>nodes</i> are already in the priority order
*/
static DatanodeInfo bestNode(DatanodeInfo nodes[],
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
return nodes[i];
}
}
}
throw new IOException("No live nodes contain current block");
}
/** Utility class to encapsulate data node info and its address. */ /** Utility class to encapsulate data node info and its address. */
static class DNAddrPair { private static final class DNAddrPair {
final DatanodeInfo info; final DatanodeInfo info;
final InetSocketAddress addr; final InetSocketAddress addr;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr) { final StorageType storageType;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
StorageType storageType) {
this.info = info; this.info = info;
this.addr = addr; this.addr = addr;
this.storageType = storageType;
} }
} }

View File

@ -42,6 +42,8 @@
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -89,9 +91,9 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.htrace.Span; import org.htrace.Span;
import org.htrace.Trace; import org.htrace.Trace;
import org.htrace.TraceScope; import org.htrace.TraceScope;
@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
private String src; private String src;
private final long fileId; private final long fileId;
private final long blockSize; private final long blockSize;
private final DataChecksum checksum; /** Only for DataTransferProtocol.writeBlock(..) */
private final DataChecksum checksum4WriteBlock;
private final int bytesPerChecksum;
// both dataQueue and ackQueue are protected by dataQueue lock // both dataQueue and ackQueue are protected by dataQueue lock
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>(); private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@ -245,6 +250,9 @@ void writeData(byte[] inarray, int off, int len) {
} }
void writeChecksum(byte[] inarray, int off, int len) { void writeChecksum(byte[] inarray, int off, int len) {
if (len == 0) {
return;
}
if (checksumPos + len > dataStart) { if (checksumPos + len > dataStart) {
throw new BufferOverflowException(); throw new BufferOverflowException();
} }
@ -377,19 +385,12 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
private final Span traceSpan; private final Span traceSpan;
/**
* Default construction for file create
*/
private DataStreamer(HdfsFileStatus stat) {
this(stat, null);
}
/** /**
* construction with tracing info * construction with tracing info
*/ */
private DataStreamer(HdfsFileStatus stat, Span span) { private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false; isAppend = false;
isLazyPersistFile = initLazyPersist(stat); isLazyPersistFile = isLazyPersist(stat);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span; traceSpan = span;
} }
@ -409,7 +410,7 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
block = lastBlock.getBlock(); block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken(); accessToken = lastBlock.getBlockToken();
isLazyPersistFile = initLazyPersist(stat); isLazyPersistFile = isLazyPersist(stat);
long usedInLastBlock = stat.getLen() % blockSize; long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock); int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@ -452,13 +453,6 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
} }
} }
private boolean initLazyPersist(HdfsFileStatus stat) {
final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
.getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
return lpPolicy != null &&
stat.getStoragePolicy() == lpPolicy.getId();
}
private void setPipeline(LocatedBlock lb) { private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
@ -553,7 +547,7 @@ public void run() {
} }
// get packet to be sent. // get packet to be sent.
if (dataQueue.isEmpty()) { if (dataQueue.isEmpty()) {
one = new Packet(checksum.getChecksumSize()); // heartbeat packet one = new Packet(getChecksumSize()); // heartbeat packet
} else { } else {
one = dataQueue.getFirst(); // regular data packet one = dataQueue.getFirst(); // regular data packet
} }
@ -1408,8 +1402,8 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
// send the request // send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, nodes.length, block.getNumBytes(), bytesSent, newGS,
cachingStrategy.get(), isLazyPersistFile); checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1618,9 +1612,23 @@ public synchronized DatanodeInfo[] getPipeline() {
return value; return value;
} }
/**
* @return the object for computing checksum.
* The type is NULL if checksum is not computed.
*/
private static DataChecksum getChecksum4Compute(DataChecksum checksum,
HdfsFileStatus stat) {
if (isLazyPersist(stat) && stat.getReplication() == 1) {
// do not compute checksum for writing to single replica to memory
return DataChecksum.newDataChecksum(Type.NULL,
checksum.getBytesPerChecksum());
}
return checksum;
}
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException { HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum); super(getChecksum4Compute(checksum, stat));
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
this.src = src; this.src = src;
this.fileId = stat.getFileId(); this.fileId = stat.getFileId();
@ -1635,15 +1643,18 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
"Set non-null progress callback on DFSOutputStream " + src); "Set non-null progress callback on DFSOutputStream " + src);
} }
final int bytesPerChecksum = checksum.getBytesPerChecksum(); this.bytesPerChecksum = checksum.getBytesPerChecksum();
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) { if (bytesPerChecksum <= 0) {
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum + throw new HadoopIllegalArgumentException(
") and blockSize(" + blockSize + "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
") do not match. " + "blockSize should be a " +
"multiple of io.bytes.per.checksum");
} }
this.checksum = checksum; if (blockSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ ") must divide block size (=" + blockSize + ").");
}
this.checksum4WriteBlock = checksum;
this.dfsclientSlowLogThresholdMs = this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
} }
@ -1655,8 +1666,7 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
checksum.getBytesPerChecksum());
Span traceSpan = null; Span traceSpan = null;
if (Trace.isTracing()) { if (Trace.isTracing()) {
@ -1734,11 +1744,9 @@ private DFSOutputStream(DFSClient dfsClient, String src,
if (lastBlock != null) { if (lastBlock != null) {
// indicate that we are appending to an existing block // indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize(); bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
checksum.getBytesPerChecksum(), traceSpan);
} else { } else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
checksum.getBytesPerChecksum());
streamer = new DataStreamer(stat, traceSpan); streamer = new DataStreamer(stat, traceSpan);
} }
this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@ -1752,9 +1760,15 @@ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
out.start(); out.start();
return out; return out;
} }
private static boolean isLazyPersist(HdfsFileStatus stat) {
final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
return p != null && stat.getStoragePolicy() == p.getId();
}
private void computePacketChunkSize(int psize, int csize) { private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize(); final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(psize/chunkSize, 1); chunksPerPacket = Math.max(psize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket; packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
@ -1811,21 +1825,19 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
dfsClient.checkOpen(); dfsClient.checkOpen();
checkClosed(); checkClosed();
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) { if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len + throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " + " is larger than supported bytesPerChecksum " +
bytesPerChecksum); bytesPerChecksum);
} }
if (cklen != this.checksum.getChecksumSize()) { if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " + throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() + getChecksumSize() + " but found to be " + cklen);
" but found to be " + cklen);
} }
if (currentPacket == null) { if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); bytesCurBlock, currentSeqno++, getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno + currentPacket.seqno +
@ -1873,7 +1885,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
// //
if (bytesCurBlock == blockSize) { if (bytesCurBlock == blockSize) {
currentPacket = new Packet(0, 0, bytesCurBlock, currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, this.checksum.getChecksumSize()); currentSeqno++, getChecksumSize());
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket(); waitAndQueueCurrentPacket();
@ -1961,7 +1973,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
// but sync was requested. // but sync was requested.
// Send an empty packet // Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); bytesCurBlock, currentSeqno++, getChecksumSize());
} }
} else { } else {
if (isSync && bytesCurBlock > 0) { if (isSync && bytesCurBlock > 0) {
@ -1970,7 +1982,7 @@ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
// and sync was requested. // and sync was requested.
// So send an empty sync packet. // So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket, currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); bytesCurBlock, currentSeqno++, getChecksumSize());
} else { } else {
// just discard the current packet since it is already been sent. // just discard the current packet since it is already been sent.
currentPacket = null; currentPacket = null;
@ -2174,8 +2186,7 @@ public synchronized void close() throws IOException {
if (bytesCurBlock != 0) { if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
currentPacket = new Packet(0, 0, bytesCurBlock, currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true; currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock; currentPacket.syncBlock = shouldSyncBlock;
} }
@ -2239,8 +2250,7 @@ public void setArtificialSlowdown(long period) {
@VisibleForTesting @VisibleForTesting
public synchronized void setChunksPerPacket(int value) { public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value); chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = (checksum.getBytesPerChecksum() + packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
checksum.getChecksumSize()) * chunksPerPacket;
} }
synchronized void setTestFilename(String newname) { synchronized void setTestFilename(String newname) {

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.protocol; package org.apache.hadoop.hdfs.protocol;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -185,7 +186,11 @@ public String toString() {
+ "; getBlockSize()=" + getBlockSize() + "; getBlockSize()=" + getBlockSize()
+ "; corrupt=" + corrupt + "; corrupt=" + corrupt
+ "; offset=" + offset + "; offset=" + offset
+ "; locs=" + java.util.Arrays.asList(locs) + "; locs=" + Arrays.asList(locs)
+ "; storageIDs=" +
(storageIDs != null ? Arrays.asList(storageIDs) : null)
+ "; storageTypes=" +
(storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}"; + "}";
} }
} }

View File

@ -29,10 +29,13 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import org.apache.hadoop.io.IOUtils; import org.apache.commons.logging.Log;
import org.apache.hadoop.util.DataChecksum; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -46,6 +49,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class BlockMetadataHeader { public class BlockMetadataHeader {
private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
public static final short VERSION = 1; public static final short VERSION = 1;
@ -73,6 +77,37 @@ public DataChecksum getChecksum() {
return checksum; return checksum;
} }
/**
* Read the checksum header from the meta file.
* @return the data checksum obtained from the header.
*/
public static DataChecksum readDataChecksum(File metaFile) throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
return readDataChecksum(in, metaFile);
} finally {
IOUtils.closeStream(in);
}
}
/**
* Read the checksum header from the meta input stream.
* @return the data checksum obtained from the header.
*/
public static DataChecksum readDataChecksum(final DataInputStream metaIn,
final Object name) throws IOException {
// read and handle the common header here. For now just a version
final BlockMetadataHeader header = readHeader(metaIn);
if (header.getVersion() != VERSION) {
LOG.warn("Unexpected meta-file version for " + name
+ ": version in file is " + header.getVersion()
+ " but expected version is " + VERSION);
}
return header.getChecksum();
}
/** /**
* Read the header without changing the position of the FileChannel. * Read the header without changing the position of the FileChannel.
* *
@ -82,7 +117,7 @@ public DataChecksum getChecksum() {
*/ */
public static BlockMetadataHeader preadHeader(FileChannel fc) public static BlockMetadataHeader preadHeader(FileChannel fc)
throws IOException { throws IOException {
byte arr[] = new byte[2 + DataChecksum.HEADER_LEN]; final byte arr[] = new byte[getHeaderSize()];
ByteBuffer buf = ByteBuffer.wrap(arr); ByteBuffer buf = ByteBuffer.wrap(arr);
while (buf.hasRemaining()) { while (buf.hasRemaining()) {
@ -158,7 +193,7 @@ public static void writeHeader(DataOutputStream out,
* Writes all the fields till the beginning of checksum. * Writes all the fields till the beginning of checksum.
* @throws IOException on error * @throws IOException on error
*/ */
static void writeHeader(DataOutputStream out, DataChecksum checksum) public static void writeHeader(DataOutputStream out, DataChecksum checksum)
throws IOException { throws IOException {
writeHeader(out, new BlockMetadataHeader(VERSION, checksum)); writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
} }

View File

@ -82,12 +82,12 @@ class BlockReceiver implements Closeable {
* checksum polynomial than the block is stored with on disk, * checksum polynomial than the block is stored with on disk,
* the DataNode needs to recalculate checksums before writing. * the DataNode needs to recalculate checksums before writing.
*/ */
private boolean needsChecksumTranslation; private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd; private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum; private final int bytesPerChecksum;
private int checksumSize; private final int checksumSize;
private final PacketReceiver packetReceiver = new PacketReceiver(false); private final PacketReceiver packetReceiver = new PacketReceiver(false);
@ -99,7 +99,6 @@ class BlockReceiver implements Closeable {
private DataTransferThrottler throttler; private DataTransferThrottler throttler;
private ReplicaOutputStreams streams; private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null; private DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode; private final DataNode datanode;
volatile private boolean mirrorError; volatile private boolean mirrorError;
@ -490,7 +489,7 @@ private int receivePacket() throws IOException {
long offsetInBlock = header.getOffsetInBlock(); long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno(); long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock(); boolean lastPacketInBlock = header.isLastPacketInBlock();
int len = header.getDataLen(); final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock(); boolean syncBlock = header.getSyncBlock();
// avoid double sync'ing on close // avoid double sync'ing on close
@ -499,7 +498,7 @@ private int receivePacket() throws IOException {
} }
// update received bytes // update received bytes
long firstByteInBlock = offsetInBlock; final long firstByteInBlock = offsetInBlock;
offsetInBlock += len; offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) { if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock); replicaInfo.setNumBytes(offsetInBlock);
@ -539,16 +538,15 @@ private int receivePacket() throws IOException {
flushOrSync(true); flushOrSync(true);
} }
} else { } else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* final int checksumLen = diskChecksum.getChecksumSize(len);
checksumSize; final int checksumReceivedLen = checksumBuf.capacity();
if ( checksumBuf.capacity() != checksumLen) { if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
throw new IOException("Length of checksums in packet " + throw new IOException("Invalid checksum length: received length is "
checksumBuf.capacity() + " does not match calculated checksum " + + checksumReceivedLen + " but expected length is " + checksumLen);
"length " + checksumLen);
} }
if (shouldVerifyChecksum()) { if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
try { try {
verifyChunks(dataBuf, checksumBuf); verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) { } catch (IOException ioe) {
@ -572,11 +570,17 @@ private int receivePacket() throws IOException {
translateChunks(dataBuf, checksumBuf); translateChunks(dataBuf, checksumBuf);
} }
} }
if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
// checksum is missing, need to calculate it
checksumBuf = ByteBuffer.allocate(checksumLen);
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
// by this point, the data in the buffer uses the disk checksum // by this point, the data in the buffer uses the disk checksum
byte[] lastChunkChecksum; final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try { try {
long onDiskLen = replicaInfo.getBytesOnDisk(); long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) { if (onDiskLen<offsetInBlock) {
@ -588,14 +592,16 @@ private int receivePacket() throws IOException {
} }
// If this is a partial chunk, then read in pre-existing checksum // If this is a partial chunk, then read in pre-existing checksum
if (firstByteInBlock % bytesPerChecksum != 0) { Checksum partialCrc = null;
LOG.info("Packet starts at " + firstByteInBlock + if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
" for " + block + if (LOG.isDebugEnabled()) {
" which is not a multiple of bytesPerChecksum " + LOG.debug("receivePacket for " + block
bytesPerChecksum); + ": bytesPerChecksum=" + bytesPerChecksum
+ " does not divide firstByteInBlock=" + firstByteInBlock);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize; onDiskLen / bytesPerChecksum * checksumSize;
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum); partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
} }
int startByteToDisk = (int)(onDiskLen-firstByteInBlock) int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
@ -612,41 +618,40 @@ private int receivePacket() throws IOException {
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
} }
// If this is a partial chunk, then verify that this is the only final byte[] lastCrc;
// chunk in the packet. Calculate new crc for this chunk. if (shouldNotWriteChecksum) {
if (partialCrc != null) { lastCrc = null;
} else if (partialCrc != null) {
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (len > bytesPerChecksum) { if (len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" + throw new IOException("Unexpected packet data length for "
block + ") from " + inAddr + " " + + block + " from " + inAddr + ": a partial chunk must be "
"A packet can have only one partial chunk."+ + " sent in an individual packet (data length = " + len
" len = " + len + + " > bytesPerChecksum = " + bytesPerChecksum + ")");
" bytesPerChecksum " + bytesPerChecksum);
} }
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange( lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
buf, buf.length - checksumSize, buf.length
);
checksumOut.write(buf); checksumOut.write(buf);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len); LOG.debug("Writing out partial crc for data len " + len);
} }
partialCrc = null; partialCrc = null;
} else { } else {
lastChunkChecksum = Arrays.copyOfRange( // write checksum
checksumBuf.array(), final int offset = checksumBuf.arrayOffset() +
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize, checksumBuf.position();
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen); final int end = offset + checksumLen;
checksumOut.write(checksumBuf.array(), lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
checksumBuf.arrayOffset() + checksumBuf.position(), end);
checksumLen); checksumOut.write(checksumBuf.array(), offset, checksumLen);
} }
/// flush entire packet, sync if requested /// flush entire packet, sync if requested
flushOrSync(syncBlock); flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen( replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
offsetInBlock, lastChunkChecksum
);
datanode.metrics.incrBytesWritten(len); datanode.metrics.incrBytesWritten(len);
@ -686,6 +691,10 @@ private int receivePacket() throws IOException {
return lastPacketInBlock?-1:len; return lastPacketInBlock?-1:len;
} }
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
return Arrays.copyOfRange(array, end - size, end);
}
private void manageWriterOsCache(long offsetInBlock) { private void manageWriterOsCache(long offsetInBlock) {
try { try {
if (outFd != null && if (outFd != null &&
@ -921,18 +930,19 @@ static private long checksum2long(byte[] checksum) {
* reads in the partial crc chunk and computes checksum * reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk. * of pre-existing data in partial chunk.
*/ */
private void computePartialChunkCrc(long blkoff, long ckoff, private Checksum computePartialChunkCrc(long blkoff, long ckoff)
int bytesPerChecksum) throws IOException { throws IOException {
// find offset of the beginning of partial chunk. // find offset of the beginning of partial chunk.
// //
int sizePartialChunk = (int) (blkoff % bytesPerChecksum); int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = diskChecksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk; blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " + if (LOG.isDebugEnabled()) {
sizePartialChunk + " " + block + LOG.debug("computePartialChunkCrc for " + block
" block offset " + blkoff + + ": sizePartialChunk=" + sizePartialChunk
" metafile offset " + ckoff); + ", block offset=" + blkoff
+ ", metafile offset=" + ckoff);
}
// create an input stream from the block file // create an input stream from the block file
// and read in partial crc chunk into temporary buffer // and read in partial crc chunk into temporary buffer
@ -951,10 +961,12 @@ private void computePartialChunkCrc(long blkoff, long ckoff,
} }
// compute crc of partial chunk from data read in the block file. // compute crc of partial chunk from data read in the block file.
partialCrc = DataChecksum.newDataChecksum( final Checksum partialCrc = DataChecksum.newDataChecksum(
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum()); diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
partialCrc.update(buf, 0, sizePartialChunk); partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for " + block); if (LOG.isDebugEnabled()) {
LOG.debug("Read in partial CRC chunk from disk for " + block);
}
// paranoia! verify that the pre-computed crc matches what we // paranoia! verify that the pre-computed crc matches what we
// recalculated just now // recalculated just now
@ -965,6 +977,7 @@ private void computePartialChunkCrc(long blkoff, long ckoff,
checksum2long(crcbuf); checksum2long(crcbuf);
throw new IOException(msg); throw new IOException(msg);
} }
return partialCrc;
} }
private static enum PacketResponderType { private static enum PacketResponderType {

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
@ -265,26 +266,37 @@ class BlockSender implements java.io.Closeable {
*/ */
DataChecksum csum = null; DataChecksum csum = null;
if (verifyChecksum || sendChecksum) { if (verifyChecksum || sendChecksum) {
final InputStream metaIn = datanode.data.getMetaDataInputStream(block); LengthInputStream metaIn = null;
if (!corruptChecksumOk || metaIn != null) { boolean keepMetaInOpen = false;
if (metaIn == null) { try {
//need checksum but meta-data not found metaIn = datanode.data.getMetaDataInputStream(block);
throw new FileNotFoundException("Meta-data not found for " + block); if (!corruptChecksumOk || metaIn != null) {
} if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " +
block);
}
checksumIn = new DataInputStream( // The meta file will contain only the header if the NULL checksum
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); // type was used, or if the replica was written to transient storage.
// Checksum verification is not performed for replicas on transient
// storage. The header is important for determining the checksum
// type later when lazy persistence copies the block to non-transient
// storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); keepMetaInOpen = true;
short version = header.getVersion(); }
if (version != BlockMetadataHeader.VERSION) { } else {
LOG.warn("Wrong version (" + version + ") for metadata file for " LOG.warn("Could not find metadata file for " + block);
+ block + " ignoring ..."); }
} finally {
if (!keepMetaInOpen) {
IOUtils.closeStream(metaIn);
} }
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
} }
} }
if (csum == null) { if (csum == null) {
@ -343,7 +355,7 @@ class BlockSender implements java.io.Closeable {
endOffset = end; endOffset = end;
// seek to the right offsets // seek to the right offsets
if (offset > 0) { if (offset > 0 && checksumIn != null) {
long checksumSkip = (offset / chunkSize) * checksumSize; long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below // note blockInStream is seeked when created below
if (checksumSkip > 0) { if (checksumSkip > 0) {

View File

@ -213,7 +213,7 @@ public ReplicaOutputStreams createStreams(boolean isCreate,
// the checksum that should actually be used -- this // the checksum that should actually be used -- this
// may differ from requestedChecksum for appends. // may differ from requestedChecksum for appends.
DataChecksum checksum; final DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
@ -250,7 +250,7 @@ public ReplicaOutputStreams createStreams(boolean isCreate,
} }
} }
} else { } else {
// for create, we can use the requested checksum // for create, we can use the requested checksum
checksum = requestedChecksum; checksum = requestedChecksum;
} }
@ -264,7 +264,8 @@ public ReplicaOutputStreams createStreams(boolean isCreate,
blockOut.getChannel().position(blockDiskSize); blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize); crcOut.getChannel().position(crcDiskSize);
} }
return new ReplicaOutputStreams(blockOut, crcOut, checksum); return new ReplicaOutputStreams(blockOut, crcOut, checksum,
getVolume().isTransientStorage());
} catch (IOException e) { } catch (IOException e) {
IOUtils.closeStream(blockOut); IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF); IOUtils.closeStream(metaRAF);

View File

@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
private final OutputStream dataOut; private final OutputStream dataOut;
private final OutputStream checksumOut; private final OutputStream checksumOut;
private final DataChecksum checksum; private final DataChecksum checksum;
private final boolean isTransientStorage;
/** /**
* Create an object with a data output stream, a checksum output stream * Create an object with a data output stream, a checksum output stream
* and a checksum. * and a checksum.
*/ */
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
DataChecksum checksum) { DataChecksum checksum, boolean isTransientStorage) {
this.dataOut = dataOut; this.dataOut = dataOut;
this.checksumOut = checksumOut; this.checksumOut = checksumOut;
this.checksum = checksum; this.checksum = checksum;
this.isTransientStorage = isTransientStorage;
} }
/** @return the data output stream. */ /** @return the data output stream. */
@ -59,6 +61,11 @@ public DataChecksum getChecksum() {
return checksum; return checksum;
} }
/** @return is writing to a transient storage? */
public boolean isTransientStorage() {
return isTransientStorage;
}
@Override @Override
public void close() { public void close() {
IOUtils.closeStream(dataOut); IOUtils.closeStream(dataOut);

View File

@ -593,13 +593,8 @@ private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
HdfsConstants.IO_FILE_BUFFER_SIZE)); HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version // read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
short version = header.getVersion(); checksumIn, metaFile);
if (version != BlockMetadataHeader.VERSION) {
FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
+ metaFile + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
int bytesPerChecksum = checksum.getBytesPerChecksum(); int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize(); int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min( long numChunks = Math.min(

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -58,6 +60,7 @@
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -91,6 +94,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
@ -633,7 +637,7 @@ public InputStream getBlockInputStream(ExtendedBlock b,
* Get the meta info of a block stored in volumeMap. To find a block, * Get the meta info of a block stored in volumeMap. To find a block,
* block pool Id, block Id and generation stamp must match. * block pool Id, block Id and generation stamp must match.
* @param b extended block * @param b extended block
* @return the meta replica information; null if block was not found * @return the meta replica information
* @throws ReplicaNotFoundException if no entry is in the map or * @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch * there is a generation stamp mismatch
*/ */
@ -721,23 +725,80 @@ static File[] copyBlockFiles(long blockId, long genStamp,
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName()); final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
try { computeChecksum(srcMeta, dstMeta, srcFile);
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
} catch (IOException e) {
throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
}
try { try {
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Copied " + srcMeta + " to " + dstMeta); LOG.debug("Copied " + srcMeta + " to " + dstMeta +
" and calculated checksum");
LOG.debug("Copied " + srcFile + " to " + dstFile); LOG.debug("Copied " + srcFile + " to " + dstFile);
} }
return new File[] {dstMeta, dstFile}; return new File[] {dstMeta, dstFile};
} }
/**
* Compute and store the checksum for a block file that does not already have
* its checksum computed.
*
* @param srcMeta source meta file, containing only the checksum header, not a
* calculated checksum
* @param dstMeta destination meta file, into which this method will write a
* full computed checksum
* @param blockFile block file for which the checksum will be computed
* @throws IOException
*/
private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
DataOutputStream metaOut = null;
InputStream dataIn = null;
try {
File parentFile = dstMeta.getParentFile();
if (parentFile != null) {
if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
throw new IOException("Destination '" + parentFile
+ "' directory cannot be created");
}
}
metaOut = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
BlockMetadataHeader.writeHeader(metaOut, checksum);
dataIn = isNativeIOAvailable ?
NativeIO.getShareDeleteFileInputStream(blockFile) :
new FileInputStream(blockFile);
int offset = 0;
for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
if (n > 0) {
n += offset;
offset = n % checksum.getBytesPerChecksum();
final int length = n - offset;
if (length > 0) {
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
System.arraycopy(data, length, data, 0, offset);
}
}
}
// calculate and write the last crc
checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
metaOut.write(crcs, 0, 4);
} finally {
IOUtils.cleanup(LOG, dataIn, metaOut);
}
}
static private void truncateBlock(File blockFile, File metaFile, static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException { long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile LOG.info("truncateBlock: blockFile=" + blockFile
@ -1640,6 +1701,7 @@ public void uncache(String bpid, long[] blockIds) {
} }
} }
@Override
public boolean isCached(String bpid, long blockId) { public boolean isCached(String bpid, long blockId) {
return cacheManager.isCached(bpid, blockId); return cacheManager.isCached(bpid, blockId);
} }
@ -2555,8 +2617,14 @@ private void evictBlocks() throws IOException {
// Before deleting the files from transient storage we must notify the // Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from // NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost. // the transient storage might cause the NN to think the blocks are lost.
// Replicas must be evicted from client short-circuit caches, because the
// storage will no longer be transient, and thus will require validating
// checksum. This also stops a client from holding file descriptors,
// which would prevent the OS from reclaiming the memory.
ExtendedBlock extendedBlock = ExtendedBlock extendedBlock =
new ExtendedBlock(bpid, newReplicaInfo); new ExtendedBlock(bpid, newReplicaInfo);
datanode.getShortCircuitRegistry().processBlockInvalidation(
ExtendedBlockId.fromExtendedBlock(extendedBlock));
datanode.notifyNamenodeReceivedBlock( datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid()); extendedBlock, null, newReplicaInfo.getStorageUuid());

View File

@ -241,7 +241,7 @@ public void run() {
} catch (Exception e){ } catch (Exception e){
FsDatasetImpl.LOG.warn( FsDatasetImpl.LOG.warn(
"LazyWriter failed to async persist RamDisk block pool id: " "LazyWriter failed to async persist RamDisk block pool id: "
+ bpId + "block Id: " + blockId); + bpId + "block Id: " + blockId, e);
} finally { } finally {
if (!succeeded) { if (!succeeded) {
datanode.getFSDataset().onFailLazyPersist(bpId, blockId); datanode.getFSDataset().onFailLazyPersist(bpId, blockId);

View File

@ -168,9 +168,9 @@ synchronized int numReplicasNotPersisted() {
@Override @Override
synchronized RamDiskReplicaLru getNextCandidateForEviction() { synchronized RamDiskReplicaLru getNextCandidateForEviction() {
Iterator it = replicasPersisted.values().iterator(); final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
while (it.hasNext()) { while (it.hasNext()) {
RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next(); final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove(); it.remove();
Map<Long, RamDiskReplicaLru> replicaMap = Map<Long, RamDiskReplicaLru> replicaMap =

View File

@ -248,7 +248,8 @@ synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
+ theBlock); + theBlock);
} else { } else {
SimulatedOutputStream crcStream = new SimulatedOutputStream(); SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume.isTransientStorage());
} }
} }

View File

@ -0,0 +1,389 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public abstract class LazyPersistTestCase {
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
protected static final int EVICTION_LOW_WATERMARK = 1;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private static final String JMX_SERVICE_NAME = "DataNode";
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
protected MiniDFSCluster cluster;
protected DistributedFileSystem fs;
protected DFSClient client;
protected JMXGet jmx;
protected TemporarySocketDirectory sockDir;
@After
public void shutDownCluster() throws Exception {
// Dump all RamDisk JMX metrics before shutdown the cluster
printRamDiskJMXMetrics();
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
if (jmx != null) {
jmx = null;
}
IOUtils.closeQuietly(sockDir);
sockDir = null;
}
@Rule
public Timeout timeout = new Timeout(300000);
protected final LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
}
protected final void makeRandomTestFile(Path path, long length,
boolean isLazyPersist, long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
protected final void makeTestFile(Path path, long length,
boolean isLazyPersist) throws IOException {
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Allocate a block.
byte[] buffer = new byte[BUFFER_LENGTH];
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length);
bytesWritten += buffer.length;
}
if (length > 0) {
fos.hsync();
}
} finally {
IOUtils.closeQuietly(fos);
}
}
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
protected final void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity,
final boolean useSCR,
final boolean useLegacyBlockReaderLocal)
throws IOException {
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
if (useSCR) {
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
// Do not share a client context across tests.
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
if (useLegacyBlockReaderLocal) {
conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
} else {
sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
}
}
long[] capacities = null;
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
// Convert replica count to byte count, add some delta for .meta and
// VERSION files.
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
(BLOCK_SIZE - 1);
capacities = new long[] { ramDiskStorageLimit, -1 };
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(REPL_FACTOR)
.storageCapacities(capacities)
.storageTypes(hasTransientStorage ?
new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
try {
jmx = initJMX();
} catch (Exception e) {
fail("Failed initialize JMX for testing: " + e);
}
LOG.info("Cluster startup complete");
}
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
protected final void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException {
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
if (useSCR)
{
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(numDataNodes)
.storageTypes(storageTypes != null ?
storageTypes : new StorageType[] { DEFAULT, DEFAULT })
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageType() == RAM_DISK) {
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
}
}
}
LOG.info("Cluster startup complete");
}
protected final void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity)
throws IOException {
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
}
protected final void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
protected final boolean verifyBlockDeletedFromDir(File dir,
LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
protected final void verifyRamDiskJMXMetric(String metricName,
long expectedValue) throws Exception {
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
}
protected final boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private JMXGet initJMX() throws Exception {
JMXGet jmx = new JMXGet();
jmx.setService(JMX_SERVICE_NAME);
jmx.init();
return jmx;
}
private void printRamDiskJMXMetrics() {
try {
if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -17,103 +17,45 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.*; import java.io.File;
import java.util.*; import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not; import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestLazyPersistFiles { public class TestLazyPersistFiles extends LazyPersistTestCase {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15; private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
private static final int THREADPOOL_SIZE = 10; private static final int THREADPOOL_SIZE = 10;
private static final short REPL_FACTOR = 1; @Test
private static final int BLOCK_SIZE = 5 * 1024 * 1024;
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static final int EVICTION_LOW_WATERMARK = 1;
private static final String JMX_SERVICE_NAME = "DataNode";
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
private JMXGet jmx;
@After
public void shutDownCluster() throws Exception {
// Dump all RamDisk JMX metrics before shutdown the cluster
printRamDiskJMXMetrics();
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
if (jmx != null) {
jmx = null;
}
}
@Test (timeout=300000)
public void testPolicyNotSetByDefault() throws IOException { public void testPolicyNotSetByDefault() throws IOException {
startUpCluster(false, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -126,7 +68,7 @@ public void testPolicyNotSetByDefault() throws IOException {
assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID)); assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
} }
@Test (timeout=300000) @Test
public void testPolicyPropagation() throws IOException { public void testPolicyPropagation() throws IOException {
startUpCluster(false, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -138,7 +80,7 @@ public void testPolicyPropagation() throws IOException {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
} }
@Test (timeout=300000) @Test
public void testPolicyPersistenceInEditLog() throws IOException { public void testPolicyPersistenceInEditLog() throws IOException {
startUpCluster(false, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -152,7 +94,7 @@ public void testPolicyPersistenceInEditLog() throws IOException {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
} }
@Test (timeout=300000) @Test
public void testPolicyPersistenceInFsImage() throws IOException { public void testPolicyPersistenceInFsImage() throws IOException {
startUpCluster(false, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -170,7 +112,7 @@ public void testPolicyPersistenceInFsImage() throws IOException {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID)); assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
} }
@Test (timeout=300000) @Test
public void testPlacementOnRamDisk() throws IOException { public void testPlacementOnRamDisk() throws IOException {
startUpCluster(true, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -180,7 +122,7 @@ public void testPlacementOnRamDisk() throws IOException {
ensureFileReplicasOnStorageType(path, RAM_DISK); ensureFileReplicasOnStorageType(path, RAM_DISK);
} }
@Test (timeout=300000) @Test
public void testPlacementOnSizeLimitedRamDisk() throws IOException { public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(true, 3); startUpCluster(true, 3);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -199,7 +141,7 @@ public void testPlacementOnSizeLimitedRamDisk() throws IOException {
* Write should default to disk. No error. * Write should default to disk. No error.
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test
public void testFallbackToDisk() throws IOException { public void testFallbackToDisk() throws IOException {
startUpCluster(false, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -213,7 +155,7 @@ public void testFallbackToDisk() throws IOException {
* File can not fit in RamDisk even with eviction * File can not fit in RamDisk even with eviction
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test
public void testFallbackToDiskFull() throws Exception { public void testFallbackToDiskFull() throws Exception {
startUpCluster(false, 0); startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -231,7 +173,7 @@ public void testFallbackToDiskFull() throws Exception {
* Expect 2 or less blocks are on RamDisk and 3 or more on disk. * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test
public void testFallbackToDiskPartial() public void testFallbackToDiskPartial()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 2); startUpCluster(true, 2);
@ -271,7 +213,7 @@ public void testFallbackToDiskPartial()
* *
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test
public void testRamDiskNotChosenByDefault() throws IOException { public void testRamDiskNotChosenByDefault() throws IOException {
startUpCluster(true, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -289,7 +231,7 @@ public void testRamDiskNotChosenByDefault() throws IOException {
* Append to lazy persist file is denied. * Append to lazy persist file is denied.
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test
public void testAppendIsDenied() throws IOException { public void testAppendIsDenied() throws IOException {
startUpCluster(true, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -310,7 +252,7 @@ public void testAppendIsDenied() throws IOException {
* must be discarded by the NN, instead of being kept around as a * must be discarded by the NN, instead of being kept around as a
* 'corrupt' file. * 'corrupt' file.
*/ */
@Test (timeout=300000) @Test
public void testLazyPersistFilesAreDiscarded() public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 2); startUpCluster(true, 2);
@ -344,7 +286,7 @@ public void testLazyPersistFilesAreDiscarded()
is(0L)); is(0L));
} }
@Test (timeout=300000) @Test
public void testLazyPersistBlocksAreSaved() public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, -1); startUpCluster(true, -1);
@ -399,7 +341,7 @@ public void testLazyPersistBlocksAreSaved()
* RamDisk eviction after lazy persist to disk. * RamDisk eviction after lazy persist to disk.
* @throws Exception * @throws Exception
*/ */
@Test (timeout=300000) @Test
public void testRamDiskEviction() throws Exception { public void testRamDiskEviction() throws Exception {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK); startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -434,7 +376,7 @@ public void testRamDiskEviction() throws Exception {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test
public void testRamDiskEvictionBeforePersist() public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 1); startUpCluster(true, 1);
@ -459,7 +401,7 @@ public void testRamDiskEvictionBeforePersist()
assert(fs.exists(path1)); assert(fs.exists(path1));
assert(fs.exists(path2)); assert(fs.exists(path2));
verifyReadRandomFile(path1, BLOCK_SIZE, SEED); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} }
/** /**
@ -467,7 +409,7 @@ public void testRamDiskEvictionBeforePersist()
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test
public void testRamDiskEvictionIsLru() public void testRamDiskEvictionIsLru()
throws Exception { throws Exception {
final int NUM_PATHS = 5; final int NUM_PATHS = 5;
@ -529,7 +471,7 @@ public void testRamDiskEvictionIsLru()
* Memory is freed up and file is gone. * Memory is freed up and file is gone.
* @throws IOException * @throws IOException
*/ */
@Test // (timeout=300000) @Test
public void testDeleteBeforePersist() public void testDeleteBeforePersist()
throws Exception { throws Exception {
startUpCluster(true, -1); startUpCluster(true, -1);
@ -556,7 +498,7 @@ public void testDeleteBeforePersist()
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test
public void testDeleteAfterPersist() public void testDeleteAfterPersist()
throws Exception { throws Exception {
startUpCluster(true, -1); startUpCluster(true, -1);
@ -584,7 +526,7 @@ public void testDeleteAfterPersist()
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test
public void testDfsUsageCreateDelete() public void testDfsUsageCreateDelete()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 4); startUpCluster(true, 4);
@ -615,7 +557,7 @@ public void testDfsUsageCreateDelete()
/** /**
* Concurrent read from the same node and verify the contents. * Concurrent read from the same node and verify the contents.
*/ */
@Test (timeout=300000) @Test
public void testConcurrentRead() public void testConcurrentRead()
throws Exception { throws Exception {
startUpCluster(true, 2); startUpCluster(true, 2);
@ -666,7 +608,7 @@ public void run() {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test
public void testConcurrentWrites() public void testConcurrentWrites()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(true, 9); startUpCluster(true, 9);
@ -702,7 +644,7 @@ public void testConcurrentWrites()
assertThat(testFailed.get(), is(false)); assertThat(testFailed.get(), is(false));
} }
@Test (timeout=300000) @Test
public void testDnRestartWithSavedReplicas() public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -726,7 +668,7 @@ public void testDnRestartWithSavedReplicas()
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
@Test (timeout=300000) @Test
public void testDnRestartWithUnsavedReplicas() public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -746,183 +688,6 @@ public void testDnRestartWithUnsavedReplicas()
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
} }
// ---- Utility functions for all test cases -------------------------------
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
private void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity,
final boolean useSCR)
throws IOException {
conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
long[] capacities = null;
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
// Convert replica count to byte count, add some delta for .meta and VERSION files.
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
capacities = new long[] { ramDiskStorageLimit, -1 };
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(REPL_FACTOR)
.storageCapacities(capacities)
.storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
try {
jmx = initJMX();
} catch (Exception e) {
fail("Failed initialize JMX for testing: " + e);
}
LOG.info("Cluster startup complete");
}
private void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity)
throws IOException {
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
throws IOException {
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Allocate a block.
byte[] buffer = new byte[BUFFER_LENGTH];
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length);
bytesWritten += buffer.length;
}
if (length > 0) {
fos.hsync();
}
} finally {
IOUtils.closeQuietly(fos);
}
}
private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
}
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
private boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
private void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
class WriterRunnable implements Runnable { class WriterRunnable implements Runnable {
private final int id; private final int id;
private final Path paths[]; private final Path paths[];
@ -960,27 +725,4 @@ public void run() {
} }
} }
} }
JMXGet initJMX() throws Exception
{
JMXGet jmx = new JMXGet();
jmx.setService(JMX_SERVICE_NAME);
jmx.init();
return jmx;
}
void printRamDiskJMXMetrics() {
try {
if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) {
e.printStackTrace();
}
}
void verifyRamDiskJMXMetric(String metricName, long expectedValue)
throws Exception {
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
}
} }

View File

@ -15,84 +15,44 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException;
import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.junit.Assert;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.junit.Assume;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Before;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.junit.BeforeClass;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.junit.Rule;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.junit.Test;
import org.apache.hadoop.net.unix.DomainSocket; import org.junit.rules.ExpectedException;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.log4j.Level;
import org.junit.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.hamcrest.CoreMatchers.equalTo;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.junit.Assert.assertThat;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.junit.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
public class TestScrLazyPersistFiles { public class TestScrLazyPersistFiles extends LazyPersistTestCase {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static TemporarySocketDirectory sockDir;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
@BeforeClass @BeforeClass
public static void init() { public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation(); DomainSocket.disableBindPathValidation();
} }
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
@Before @Before
public void before() { public void before() {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS, Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
@ -100,26 +60,14 @@ public void before() {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
} }
@After @Rule
public void shutDownCluster() throws IOException { public ExpectedException exception = ExpectedException.none();
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
}
/** /**
* Read in-memory block with Short Circuit Read * Read in-memory block with Short Circuit Read
* Note: the test uses faked RAM_DISK from physical disk. * Note: the test uses faked RAM_DISK from physical disk.
*/ */
@Test (timeout=300000) @Test
public void testRamDiskShortCircuitRead() public void testRamDiskShortCircuitRead()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(REPL_FACTOR,
@ -160,7 +108,7 @@ public void testRamDiskShortCircuitRead()
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000000) @Test
public void testRamDiskEvictionWithShortCircuitReadHandle() public void testRamDiskEvictionWithShortCircuitReadHandle()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
@ -204,123 +152,149 @@ public void testRamDiskEvictionWithShortCircuitReadHandle()
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
// ---- Utility functions for all test cases ------------------------------- @Test
public void testShortCircuitReadAfterEviction()
/** throws IOException, InterruptedException {
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
* capped. If ramDiskStorageLimit < 0 then it is ignored. startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
*/ doShortCircuitReadAfterEvictionTest();
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException {
conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
if (useSCR)
{
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
UUID.randomUUID().toString());
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
}
REPL_FACTOR = 1; //Reset in case a test has modified the value
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(numDataNodes)
.storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageType() == RAM_DISK) {
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
}
}
}
LOG.info("Cluster startup complete");
} }
private void makeTestFile(Path path, long length, final boolean isLazyPersist) @Test
throws IOException { public void testLegacyShortCircuitReadAfterEviction()
throws IOException, InterruptedException {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadAfterEvictionTest();
}
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE); private void doShortCircuitReadAfterEvictionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
if (isLazyPersist) { final int SEED = 0xFADED;
createFlags.add(LAZY_PERSIST); makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
}
FSDataOutputStream fos = null; // Verify short-circuit read from RAM_DISK.
try { ensureFileReplicasOnStorageType(path1, RAM_DISK);
fos = File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
fs.create(path, DFSTestUtil.getFirstBlock(fs, path1));
FsPermission.getFileDefault(), assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
createFlags, assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Allocate a block. // Sleep for a short time to allow the lazy writer thread to do its job.
byte[] buffer = new byte[BUFFER_LENGTH]; Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length); // Verify short-circuit read from RAM_DISK once again.
bytesWritten += buffer.length; ensureFileReplicasOnStorageType(path1, RAM_DISK);
} metaFile = MiniDFSCluster.getBlockMetadataFile(0,
if (length > 0) { DFSTestUtil.getFirstBlock(fs, path1));
fos.hsync(); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
} assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} finally {
IOUtils.closeQuietly(fos); // Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence.
ensureFileReplicasOnStorageType(path1, DEFAULT);
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// In the implementation of legacy short-circuit reads, any failure is
// trapped silently, reverts back to a remote read, and also disables all
// subsequent legacy short-circuit reads in the ClientContext. If the test
// uses legacy, then assert that it didn't get disabled.
ClientContext clientContext = client.getClientContext();
if (clientContext.getUseLegacyBlockReaderLocal()) {
Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
} }
} }
private LocatedBlocks ensureFileReplicasOnStorageType( @Test
Path path, StorageType storageType) throws IOException { public void testShortCircuitReadBlockFileCorruption() throws IOException,
// Ensure that returned block locations returned are correct! InterruptedException {
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
assertThat(fs.exists(path), is(true)); startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
long fileLength = client.getFileInfo(path.toString()).getLen(); doShortCircuitReadBlockFileCorruptionTest();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
} }
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist, @Test
long seed) throws IOException { public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, InterruptedException {
BLOCK_SIZE, REPL_FACTOR, seed, true); startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadBlockFileCorruptionTest();
} }
private void triggerBlockReport() public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
throws IOException, InterruptedException { InterruptedException {
// Trigger block report to NN final String METHOD_NAME = GenericTestUtils.getMethodName();
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Thread.sleep(10 * 1000); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
@Test
public void testShortCircuitReadMetaFileCorruption() throws IOException,
InterruptedException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
doShortCircuitReadMetaFileCorruptionTest();
}
@Test
public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
InterruptedException {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadMetaFileCorruptionTest();
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
} }
} }