HDFS-8668. Erasure Coding: revisit buffer used for encoding and decoding. Contributed by Sammi Chen

This commit is contained in:
Kai Zheng 2016-08-13 13:52:37 +08:00
parent 4d3ea92f4f
commit b5af9be72c
15 changed files with 169 additions and 34 deletions

View File

@ -101,6 +101,7 @@ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
@Override
public synchronized void putBuffer(ByteBuffer buffer) {
buffer.clear();
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
while (true) {
Key key = new Key(buffer.capacity(), System.nanoTime());

View File

@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
@ -393,11 +394,47 @@ protected TraceScope createWriteTraceScope() {
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
writeChunkPrepare(len, ckoff, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
/* write the data chunk in <code>buffer</code> staring at
* <code>buffer.position</code> with
* a length of <code>len > 0</code>, and its checksum
*/
protected synchronized void writeChunk(ByteBuffer buffer, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
writeChunkPrepare(len, ckoff, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(buffer, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
private synchronized void writeChunkPrepare(int buflen,
int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
if (buflen > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + buflen +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
@ -414,17 +451,6 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,
currentPacket.getSeqno(), src, packetSize, chunksPerPacket,
getStreamer().getBytesCurBlock() + ", " + this);
}
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
void enqueueCurrentPacket() throws IOException {

View File

@ -35,12 +35,12 @@
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.util.DirectBufferPool;
import java.io.EOFException;
import java.io.IOException;
@ -139,7 +139,7 @@ void skip() {
}
}
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
@ -194,9 +194,14 @@ void skip() {
}
}
private boolean useDirectBuffer() {
return decoder.preferDirectBuffer();
}
private void resetCurStripeBuffer() {
if (curStripeBuf == null) {
curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
curStripeBuf.clear();
curStripeRange = new StripeRange(0, 0);
@ -204,7 +209,8 @@ private void resetCurStripeBuffer() {
private ByteBuffer getParityBuffer() {
if (parityBuf == null) {
parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * parityBlkNum);
}
parityBuf.clear();
return parityBuf;
@ -235,11 +241,11 @@ private synchronized void blockSeekTo(long target) throws IOException {
public synchronized void close() throws IOException {
super.close();
if (curStripeBuf != null) {
bufferPool.returnBuffer(curStripeBuf);
BUFFER_POOL.putBuffer(curStripeBuf);
curStripeBuf = null;
}
if (parityBuf != null) {
bufferPool.returnBuffer(parityBuf);
BUFFER_POOL.putBuffer(parityBuf);
parityBuf = null;
}
}

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -75,6 +77,9 @@
*/
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream {
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
static class MultipleBlockingQueue<T> {
private final List<BlockingQueue<T>> queues;
@ -208,7 +213,7 @@ class CellBuffers {
buffers = new ByteBuffer[numAllBlocks];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
buffers[i] = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize);
}
}
@ -236,7 +241,10 @@ private void clear() {
private void release() {
for (int i = 0; i < numAllBlocks; i++) {
byteArrayManager.release(buffers[i].array());
if (buffers[i] != null) {
BUFFER_POOL.putBuffer(buffers[i]);
buffers[i] = null;
}
}
}
@ -311,6 +319,10 @@ private void flipDataBuffers() {
setCurrentStreamer(0);
}
private boolean useDirectBuffer() {
return encoder.preferDirectBuffer();
}
StripedDataStreamer getStripedDataStreamer(int i) {
return streamers.get(i);
}
@ -907,11 +919,20 @@ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
if (current.isHealthy()) {
try {
DataChecksum sum = getDataChecksum();
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
if (buffer.isDirect()) {
ByteBuffer directCheckSumBuf =
BUFFER_POOL.getBuffer(true, checksumBuf.length);
sum.calculateChunkedSums(buffer, directCheckSumBuf);
directCheckSumBuf.get(checksumBuf);
BUFFER_POOL.putBuffer(directCheckSumBuf);
} else {
sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
}
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
super.writeChunk(buffer, chunkLen, checksumBuf, ckOffset,
getChecksumSize());
}
} catch(Exception e) {

View File

@ -57,6 +57,7 @@ public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
}
private void init() throws IOException {
initDecoderIfNecessary();
getStripedReader().init();
// allocate buffer to keep the reconstructed block data
targetBuffer = allocateBuffer(getBufferSize());
@ -150,8 +151,6 @@ private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen,
}
private void reconstructTargets(int toReconstructLen) {
initDecoderIfNecessary();
ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
ByteBuffer[] outputs = new ByteBuffer[1];

View File

@ -90,6 +90,10 @@ ByteBuffer getReadBuffer() {
return buffer;
}
void freeReadBuffer() {
buffer = null;
}
void resetBlockReader(long offsetInBlock) {
this.blockReader = createBlockReader(offsetInBlock);
}

View File

@ -49,6 +49,8 @@ boolean hasValidTargets() {
public void run() {
getDatanode().incrementXmitsInProgress();
try {
initDecoderIfNecessary();
getStripedReader().init();
stripedWriter.init();
@ -96,8 +98,6 @@ void reconstruct() throws IOException {
}
private void reconstructTargets(int toReconstructLen) {
initDecoderIfNecessary();
ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
int[] erasedIndices = stripedWriter.getRealTargetIndices();

View File

@ -30,6 +30,8 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -65,6 +67,7 @@ class StripedBlockWriter {
private ByteBuffer targetBuffer;
private long blockOffset4Target = 0;
private long seqNo4Target = 0;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
Configuration conf, ExtendedBlock block,
@ -87,6 +90,10 @@ ByteBuffer getTargetBuffer() {
return targetBuffer;
}
void freeTargetBuffer() {
targetBuffer = null;
}
/**
* Initialize output/input streams for transferring data to target
* and send create block request.
@ -154,9 +161,18 @@ void transferData2Target(byte[] packetBuf) throws IOException {
return;
}
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer.array(), 0, targetBuffer.remaining(),
stripedWriter.getChecksumBuf(), 0);
if (targetBuffer.isDirect()) {
ByteBuffer directCheckSumBuf =
BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer, directCheckSumBuf);
directCheckSumBuf.get(stripedWriter.getChecksumBuf());
BUFFER_POOL.putBuffer(directCheckSumBuf);
} else {
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer.array(), 0, targetBuffer.remaining(),
stripedWriter.getChecksumBuf(), 0);
}
int ckOff = 0;
while (targetBuffer.remaining() > 0) {

View File

@ -180,7 +180,7 @@ private void initOrVerifyChecksum(StripedBlockReader reader) {
}
protected ByteBuffer allocateReadBuffer() {
return ByteBuffer.allocate(getBufferSize());
return reconstructor.allocateBuffer(getBufferSize());
}
private void initZeroStrip() {
@ -421,7 +421,16 @@ private static void cancelReads(Collection<Future<Void>> futures) {
}
void close() {
if (zeroStripeBuffers != null) {
for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
reconstructor.freeBuffer(zeroStripeBuffer);
}
}
zeroStripeBuffers = null;
for (StripedBlockReader reader : readers) {
reconstructor.freeBuffer(reader.getReadBuffer());
reader.freeReadBuffer();
reader.closeBlockReader();
}
}

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
@ -102,6 +104,7 @@ abstract class StripedReconstructor {
private final ErasureCodingPolicy ecPolicy;
private RawErasureDecoder decoder;
private final ExtendedBlock blockGroup;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
// position in striped internal block
private long positionInBlock;
@ -139,8 +142,16 @@ abstract class StripedReconstructor {
*/
abstract void reconstruct() throws IOException;
boolean useDirectBuffer() {
return decoder.preferDirectBuffer();
}
ByteBuffer allocateBuffer(int length) {
return ByteBuffer.allocate(length);
return BUFFER_POOL.getBuffer(useDirectBuffer(), length);
}
void freeBuffer(ByteBuffer buffer) {
BUFFER_POOL.putBuffer(buffer);
}
ExtendedBlock getBlock(int i) {

View File

@ -297,6 +297,14 @@ void clearBuffers() {
}
void close() {
for (StripedBlockWriter writer : writers) {
ByteBuffer targetBuffer = writer.getTargetBuffer();
if (targetBuffer != null) {
reconstructor.freeBuffer(targetBuffer);
writer.freeTargetBuffer();
}
}
for (int i = 0; i < targets.length; i++) {
writers[i].close();
}

View File

@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@ -32,7 +33,9 @@
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After;
import org.junit.Assert;
@ -77,6 +80,11 @@ public class TestDFSStripedInputStream {
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATA_BLK_NUM + PARITY_BLK_NUM).build();

View File

@ -25,8 +25,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
@ -65,6 +68,11 @@ public void setup() throws IOException {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
fs = cluster.getFileSystem();

View File

@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@ -36,6 +37,8 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
@ -183,6 +186,11 @@ static Integer getLength(int i) {
private void setup(Configuration conf) throws IOException {
final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
@ -229,7 +237,8 @@ public void testBlockTokenExpired() throws Exception {
final HdfsConfiguration conf = newHdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
for (int dn = 0; dn < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; dn += 2) {

View File

@ -33,6 +33,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -47,6 +48,8 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
@ -86,11 +89,17 @@ enum ReconstructionType {
public void setup() throws IOException {
final Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
conf.setInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize - 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_DEFAULT_RAWCODER_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName());
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
cluster.waitActive();