HDFS-14402. Use FileChannel.transferTo() method for transferring block to SCM cache. Contributed by Feilong He.

This commit is contained in:
Rakesh Radhakrishnan 2019-05-26 14:30:11 +05:30
parent 55e0c134f0
commit 37900c5639
3 changed files with 75 additions and 153 deletions

View File

@ -18,10 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.util.DataChecksum;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -107,6 +113,59 @@ void shutdown() {
// Do nothing. // Do nothing.
} }
/**
* Verifies the block's checksum. This is an I/O intensive operation.
*/
protected void verifyChecksum(long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
FileChannel metaChannel = null;
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
throw new IOException(
"Block InputStream meta file has no FileChannel.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException("checksum verification failed: premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// Success
bytesVerified += bytesRead;
blockBuf.clear();
checksumBuf.clear();
}
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
/** /**
* Reads bytes into a buffer until EOF or the buffer's limit is reached. * Reads bytes into a buffer until EOF or the buffer's limit is reached.
*/ */

View File

@ -18,22 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
@ -98,59 +92,6 @@ MappableBlock load(long length, FileInputStream blockIn,
return mappableBlock; return mappableBlock;
} }
/**
* Verifies the block's checksum. This is an I/O intensive operation.
*/
private void verifyChecksum(long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
FileChannel metaChannel = null;
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
throw new IOException(
"Block InputStream meta file has no FileChannel.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException("checksum verification failed: premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// Success
bytesVerified += bytesRead;
blockBuf.clear();
checksumBuf.clear();
}
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
@Override @Override
public long getCacheUsed() { public long getCacheUsed() {
return memCacheStats.getCacheUsed(); return memCacheStats.getCacheUsed();

View File

@ -18,25 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
/** /**
@ -79,112 +71,42 @@ void initialize(FsDatasetCache cacheManager) throws IOException {
*/ */
@Override @Override
MappableBlock load(long length, FileInputStream blockIn, MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName, FileInputStream metaIn, String blockFileName,
ExtendedBlockId key) ExtendedBlockId key)
throws IOException { throws IOException {
PmemMappedBlock mappableBlock = null; PmemMappedBlock mappableBlock = null;
String filePath = null; String cachePath = null;
FileChannel blockChannel = null; FileChannel blockChannel = null;
RandomAccessFile file = null; RandomAccessFile cacheFile = null;
MappedByteBuffer out = null;
try { try {
blockChannel = blockIn.getChannel(); blockChannel = blockIn.getChannel();
if (blockChannel == null) { if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel."); throw new IOException("Block InputStream has no FileChannel.");
} }
cachePath = pmemVolumeManager.getCachePath(key);
cacheFile = new RandomAccessFile(cachePath, "rw");
blockChannel.transferTo(0, length, cacheFile.getChannel());
// Verify checksum for the cached data instead of block file.
// The file channel should be repositioned.
cacheFile.getChannel().position(0);
verifyChecksum(length, metaIn, cacheFile.getChannel(), blockFileName);
filePath = pmemVolumeManager.getCachePath(key);
file = new RandomAccessFile(filePath, "rw");
out = file.getChannel().
map(FileChannel.MapMode.READ_WRITE, 0, length);
if (out == null) {
throw new IOException("Failed to map the block " + blockFileName +
" to persistent storage.");
}
verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
blockFileName);
mappableBlock = new PmemMappedBlock(length, key); mappableBlock = new PmemMappedBlock(length, key);
LOG.info("Successfully cached one replica:{} into persistent memory" LOG.info("Successfully cached one replica:{} into persistent memory"
+ ", [cached path={}, length={}]", key, filePath, length); + ", [cached path={}, length={}]", key, cachePath, length);
} finally { } finally {
IOUtils.closeQuietly(blockChannel); IOUtils.closeQuietly(blockChannel);
if (out != null) { IOUtils.closeQuietly(cacheFile);
NativeIO.POSIX.munmap(out);
}
IOUtils.closeQuietly(file);
if (mappableBlock == null) { if (mappableBlock == null) {
LOG.debug("Delete {} due to unsuccessful mapping.", filePath); LOG.debug("Delete {} due to unsuccessful mapping.", cachePath);
FsDatasetUtil.deleteMappedFile(filePath); FsDatasetUtil.deleteMappedFile(cachePath);
} }
} }
return mappableBlock; return mappableBlock;
} }
/**
* Verifies the block's checksum meanwhile maps block to persistent memory.
* This is an I/O intensive operation.
*/
private void verifyChecksumAndMapBlock(
MappedByteBuffer out, long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
FileChannel metaChannel = null;
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
throw new IOException("Cannot get FileChannel from " +
"Block InputStream meta file.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException(
"Checksum verification failed for the block " + blockFileName +
": premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// / Copy data to persistent file
out.put(blockBuf);
// positioning the
bytesVerified += bytesRead;
// Clear buffer
blockBuf.clear();
checksumBuf.clear();
}
// Forces to write data to storage device containing the mapped file
out.force();
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
@Override @Override
public long getCacheUsed() { public long getCacheUsed() {
return pmemVolumeManager.getCacheUsed(); return pmemVolumeManager.getCacheUsed();