From 3c18a53cbd2efabb2ad108d63a0b0b558424115f Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Wed, 6 Apr 2016 22:50:24 -0700 Subject: [PATCH] HDFS-9719. Refactoring ErasureCodingWorker into smaller reusable constructs. Contributed by Kai Zheng. --- .../hadoop/hdfs/util/StripedBlockUtil.java | 22 +- .../erasurecode/ErasureCodingWorker.java | 1018 +---------------- .../erasurecode/StripedBlockReader.java | 202 ++++ .../erasurecode/StripedBlockWriter.java | 196 ++++ .../datanode/erasurecode/StripedReader.java | 466 ++++++++ .../erasurecode/StripedReconstructor.java | 273 +++++ .../datanode/erasurecode/StripedWriter.java | 313 +++++ .../datanode/erasurecode/package-info.java | 26 + .../hdfs/TestReconstructStripedFile.java | 11 +- 9 files changed, 1556 insertions(+), 971 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 0819376aac..c8827d9b56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -141,6 +141,12 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, return locatedBlock; } + public static ExtendedBlock constructInternalBlock( + ExtendedBlock blockGroup, ErasureCodingPolicy ecPolicy, + int idxInBlockGroup) { + return constructInternalBlock(blockGroup, ecPolicy.getCellSize(), + ecPolicy.getNumDataUnits(), idxInBlockGroup); + } /** * This method creates an internal {@link ExtendedBlock} at the given index * of a block group. @@ -154,21 +160,28 @@ public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup, return block; } + public static long getInternalBlockLength(long dataSize, + ErasureCodingPolicy ecPolicy, + int idxInBlockGroup) { + return getInternalBlockLength(dataSize, ecPolicy.getCellSize(), + ecPolicy.getNumDataUnits(), idxInBlockGroup); + } + /** * Get the size of an internal block at the given index of a block group * * @param dataSize Size of the block group only counting data blocks * @param cellSize The size of a striping cell * @param numDataBlocks The number of data blocks - * @param i The logical index in the striped block group + * @param idxInBlockGroup The logical index in the striped block group * @return The size of the internal block at the specified index */ public static long getInternalBlockLength(long dataSize, - int cellSize, int numDataBlocks, int i) { + int cellSize, int numDataBlocks, int idxInBlockGroup) { Preconditions.checkArgument(dataSize >= 0); Preconditions.checkArgument(cellSize > 0); Preconditions.checkArgument(numDataBlocks > 0); - Preconditions.checkArgument(i >= 0); + Preconditions.checkArgument(idxInBlockGroup >= 0); // Size of each stripe (only counting data blocks) final int stripeSize = cellSize * numDataBlocks; // If block group ends at stripe boundary, each internal block has an equal @@ -180,7 +193,8 @@ public static long getInternalBlockLength(long dataSize, final int numStripes = (int) ((dataSize - 1) / stripeSize + 1); return (numStripes - 1L)*cellSize - + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i); + + lastCellSize(lastStripeDataLen, cellSize, + numDataBlocks, idxInBlockGroup); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 4bcb291b28..e7c5abc2bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -17,169 +17,111 @@ */ package org.apache.hadoop.hdfs.server.datanode.erasurecode; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; + import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSPacket; -import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; -import org.apache.hadoop.hdfs.RemoteBlockReader2; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.erasurecode.CodecUtil; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.DataChecksum; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; - /** * ErasureCodingWorker handles the erasure coding reconstruction work commands. - * These commands would be issued from Namenode as part of Datanode's heart - * beat response. BPOfferService delegates the work to this class for handling - * EC commands. + * These commands would be issued from Namenode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling EC + * commands. */ @InterfaceAudience.Private public final class ErasureCodingWorker { private static final Logger LOG = DataNode.LOG; - - private final DataNode datanode; + + private final DataNode datanode; private final Configuration conf; - private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL; - private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL; - private final int EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS; - private final int EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; + private ThreadPoolExecutor stripedReconstructionPool; + private ThreadPoolExecutor stripedReadPool; public ErasureCodingWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.conf = conf; - EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( - DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); initializeStripedReadThreadPool(conf.getInt( DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY, DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT)); - EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE = conf.getInt( - DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); - initializeStripedBlkReconstructionThreadPool(conf.getInt( DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY, DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT)); } - - private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { - return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits); - } private void initializeStripedReadThreadPool(int num) { LOG.debug("Using striped reads; pool threads={}", num); - EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, - 60, TimeUnit.SECONDS, new SynchronousQueue(), + stripedReadPool = new ThreadPoolExecutor(1, num, 60, TimeUnit.SECONDS, + new SynchronousQueue(), new Daemon.DaemonFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); + private final AtomicInteger threadIndex = new AtomicInteger(0); - @Override - public Thread newThread(Runnable r) { - Thread t = super.newThread(r); - t.setName("stripedRead-" + threadIndex.getAndIncrement()); - return t; - } - }, new ThreadPoolExecutor.CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { - LOG.info("Execution for striped reading rejected, " - + "Executing in current thread"); - // will run in the current thread - super.rejectedExecution(runnable, e); - } - }); - EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + + stripedReadPool.allowCoreThreadTimeOut(true); } private void initializeStripedBlkReconstructionThreadPool(int num) { - LOG.debug("Using striped block reconstruction; pool threads={}" + num); - EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num, - 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), + LOG.debug("Using striped block reconstruction; pool threads={}", num); + stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIdx = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); - t.setName( - "stripedBlockReconstruction-" + threadIdx.getAndIncrement()); + t.setName("stripedBlockReconstruction-" + + threadIdx.getAndIncrement()); return t; } }); - EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.allowCoreThreadTimeOut(true); + stripedReconstructionPool.allowCoreThreadTimeOut(true); } /** * Handles the Erasure Coding reconstruction work commands. * - * @param ecTasks - * BlockECReconstructionInfo + * @param ecTasks BlockECReconstructionInfo + * */ public void processErasureCodingTasks( Collection ecTasks) { for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { try { - ReconstructAndTransferBlock task = - new ReconstructAndTransferBlock(reconstructionInfo); + final StripedReconstructor task = + new StripedReconstructor(this, reconstructionInfo); if (task.hasValidTargets()) { - EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.submit(task); + stripedReconstructionPool.submit(task); } else { LOG.warn("No missing internal block. Skip reconstruction for task:{}", reconstructionInfo); @@ -191,863 +133,15 @@ public void processErasureCodingTasks( } } - /** - * ReconstructAndTransferBlock reconstruct one or more missed striped block - * in the striped block group, the minimum number of live striped blocks - * should be no less than data block number. - * - * | <- Striped Block Group -> | - * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group - * | | | | - * v v v v - * +------+ +------+ +------+ +------+ - * |cell_0| |cell_1| |cell_2| |cell_3| ... - * +------+ +------+ +------+ +------+ - * |cell_4| |cell_5| |cell_6| |cell_7| ... - * +------+ +------+ +------+ +------+ - * |cell_8| |cell_9| |cell10| |cell11| ... - * +------+ +------+ +------+ +------+ - * ... ... ... ... - * - * - * We use following steps to reconstruct striped block group, in each round, - * we reconstruct bufferSize data until finish, the - * bufferSize is configurable and may be less or larger than - * cell size: - * step1: read bufferSize data from minimum number of sources - * required by reconstruction. - * step2: decode data for targets. - * step3: transfer data to targets. - * - * In step1, try to read bufferSize data from minimum number - * of sources , if there is corrupt or stale sources, read from new source - * will be scheduled. The best sources are remembered for next round and - * may be updated in each round. - * - * In step2, typically if source blocks we read are all data blocks, we - * need to call encode, and if there is one parity block, we need to call - * decode. Notice we only read once and reconstruct all missed striped block - * if they are more than one. - * - * In step3, send the reconstructed data to targets by constructing packet - * and send them directly. Same as continuous block replication, we - * don't check the packet ack. Since the datanode doing the reconstruction - * work are one of the source datanodes, so the reconstructed data are sent - * remotely. - * - * There are some points we can do further improvements in next phase: - * 1. we can read the block file directly on the local datanode, - * currently we use remote block reader. (Notice short-circuit is not - * a good choice, see inline comments). - * 2. We need to check the packet ack for EC reconstruction? Since EC - * reconstruction is more expensive than continuous block replication, - * it needs to read from several other datanodes, should we make sure - * the reconstructed result received by targets? - */ - private class ReconstructAndTransferBlock implements Runnable { - private final int dataBlkNum; - private final int parityBlkNum; - private final int cellSize; - - private RawErasureDecoder decoder; - - // Striped read buffer size - private int bufferSize; - - private final ExtendedBlock blockGroup; - private final int minRequiredSources; - // position in striped internal block - private long positionInBlock; - - // sources - private final byte[] liveIndices; - private final DatanodeInfo[] sources; - - private final List stripedReaders; - - // The buffers and indices for striped blocks whose length is 0 - private ByteBuffer[] zeroStripeBuffers; - private short[] zeroStripeIndices; - - // targets - private final DatanodeInfo[] targets; - private final StorageType[] targetStorageTypes; - - private final short[] targetIndices; - private final ByteBuffer[] targetBuffers; - - private final Socket[] targetSockets; - private final DataOutputStream[] targetOutputStreams; - private final DataInputStream[] targetInputStreams; - - private final long[] blockOffset4Targets; - private final long[] seqNo4Targets; - - private final static int WRITE_PACKET_SIZE = 64 * 1024; - private DataChecksum checksum; - private int maxChunksPerPacket; - private byte[] packetBuf; - private byte[] checksumBuf; - private int bytesPerChecksum; - private int checksumSize; - - private final CachingStrategy cachingStrategy; - - private final Map, Integer> futures = new HashMap<>(); - private final CompletionService readService = - new ExecutorCompletionService<>( - EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL); - private final boolean hasValidTargets; - - ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) { - ErasureCodingPolicy ecPolicy = reconstructionInfo - .getErasureCodingPolicy(); - dataBlkNum = ecPolicy.getNumDataUnits(); - parityBlkNum = ecPolicy.getNumParityUnits(); - cellSize = ecPolicy.getCellSize(); - - blockGroup = reconstructionInfo.getExtendedBlock(); - final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); - minRequiredSources = Math.min(cellsNum, dataBlkNum); - - liveIndices = reconstructionInfo.getLiveBlockIndices(); - sources = reconstructionInfo.getSourceDnInfos(); - stripedReaders = new ArrayList<>(sources.length); - - Preconditions.checkArgument(liveIndices.length >= minRequiredSources, - "No enough live striped blocks."); - Preconditions.checkArgument(liveIndices.length == sources.length, - "liveBlockIndices and source dns should match"); - - if (minRequiredSources < dataBlkNum) { - zeroStripeBuffers = - new ByteBuffer[dataBlkNum - minRequiredSources]; - zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; - } - - targets = reconstructionInfo.getTargetDnInfos(); - targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); - targetIndices = new short[targets.length]; - targetBuffers = new ByteBuffer[targets.length]; - - Preconditions.checkArgument(targetIndices.length <= parityBlkNum, - "Too much missed striped blocks."); - - targetSockets = new Socket[targets.length]; - targetOutputStreams = new DataOutputStream[targets.length]; - targetInputStreams = new DataInputStream[targets.length]; - - blockOffset4Targets = new long[targets.length]; - seqNo4Targets = new long[targets.length]; - - for (int i = 0; i < targets.length; i++) { - blockOffset4Targets[i] = 0; - seqNo4Targets[i] = 0; - } - - hasValidTargets = getTargetIndices(); - cachingStrategy = CachingStrategy.newDefaultStrategy(); - } - - boolean hasValidTargets() { - return hasValidTargets; - } - - private ByteBuffer allocateBuffer(int length) { - return ByteBuffer.allocate(length); - } - - private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { - return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, - dataBlkNum, i); - } - - private long getBlockLen(ExtendedBlock blockGroup, int i) { - return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), - cellSize, dataBlkNum, i); - } - - /** - * StripedReader is used to read from one source DN, it contains a block - * reader, buffer and striped block index. - * Only allocate StripedReader once for one source, and the StripedReader - * has the same array order with sources. Typically we only need to allocate - * minimum number (minRequiredSources) of StripedReader, and allocate - * new for new source DN if some existing DN invalid or slow. - * If some source DN is corrupt, set the corresponding blockReader to - * null and will never read from it again. - * - * @param i the array index of sources - * @param offsetInBlock offset for the internal block - * @return StripedReader - */ - private StripedReader addStripedReader(int i, long offsetInBlock) { - final ExtendedBlock block = getBlock(blockGroup, liveIndices[i]); - StripedReader reader = new StripedReader(liveIndices[i], block, sources[i]); - stripedReaders.add(reader); - - BlockReader blockReader = newBlockReader(block, offsetInBlock, sources[i]); - if (blockReader != null) { - initChecksumAndBufferSizeIfNeeded(blockReader); - reader.blockReader = blockReader; - } - reader.buffer = allocateBuffer(bufferSize); - return reader; - } - - @Override - public void run() { - datanode.incrementXmitsInProgress(); - try { - // Store the array indices of source DNs we have read successfully. - // In each iteration of read, the success list may be updated if - // some source DN is corrupted or slow. And use the updated success - // list of DNs for next iteration read. - int[] success = new int[minRequiredSources]; - - int nsuccess = 0; - for (int i = 0; - i < sources.length && nsuccess < minRequiredSources; i++) { - StripedReader reader = addStripedReader(i, 0); - if (reader.blockReader != null) { - success[nsuccess++] = i; - } - } - - if (nsuccess < minRequiredSources) { - String error = "Can't find minimum sources required by " - + "reconstruction, block id: " + blockGroup.getBlockId(); - throw new IOException(error); - } - - if (zeroStripeBuffers != null) { - for (int i = 0; i < zeroStripeBuffers.length; i++) { - zeroStripeBuffers[i] = allocateBuffer(bufferSize); - } - } - - for (int i = 0; i < targets.length; i++) { - targetBuffers[i] = allocateBuffer(bufferSize); - } - - checksumSize = checksum.getChecksumSize(); - int chunkSize = bytesPerChecksum + checksumSize; - maxChunksPerPacket = Math.max( - (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1); - int maxPacketSize = chunkSize * maxChunksPerPacket - + PacketHeader.PKT_MAX_HEADER_LEN; - - packetBuf = new byte[maxPacketSize]; - checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; - - // targetsStatus store whether some target is success, it will record - // any failed target once, if some target failed (invalid DN or transfer - // failed), will not transfer data to it any more. - boolean[] targetsStatus = new boolean[targets.length]; - if (initTargetStreams(targetsStatus) == 0) { - String error = "All targets are failed."; - throw new IOException(error); - } - - long maxTargetLength = 0; - for (short targetIndex : targetIndices) { - maxTargetLength = Math.max(maxTargetLength, - getBlockLen(blockGroup, targetIndex)); - } - while (positionInBlock < maxTargetLength) { - final int toReconstruct = (int) Math.min( - bufferSize, maxTargetLength - positionInBlock); - // step1: read from minimum source DNs required for reconstruction. - // The returned success list is the source DNs we do real read from - CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); - try { - success = readMinimumStripedData4Reconstruction(success, - toReconstruct, corruptedBlocks); - } finally { - // report corrupted blocks to NN - datanode.reportCorruptedBlocks(corruptedBlocks); - } - - // step2: decode to reconstruct targets - reconstructTargets(success, targetsStatus, toReconstruct); - - // step3: transfer data - if (transferData2Targets(targetsStatus) == 0) { - String error = "Transfer failed for all targets."; - throw new IOException(error); - } - - clearBuffers(); - positionInBlock += toReconstruct; - } - - endTargetBlocks(targetsStatus); - - // Currently we don't check the acks for packets, this is similar as - // block replication. - } catch (Throwable e) { - LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e); - } finally { - datanode.decrementXmitsInProgress(); - // close block readers - for (StripedReader stripedReader : stripedReaders) { - IOUtils.closeStream(stripedReader.blockReader); - } - for (int i = 0; i < targets.length; i++) { - IOUtils.closeStream(targetOutputStreams[i]); - IOUtils.closeStream(targetInputStreams[i]); - IOUtils.closeStream(targetSockets[i]); - } - } - } - - // init checksum from block reader - private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { - if (checksum == null) { - checksum = blockReader.getDataChecksum(); - bytesPerChecksum = checksum.getBytesPerChecksum(); - // The bufferSize is flat to divide bytesPerChecksum - int readBufferSize = EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; - bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : - readBufferSize - readBufferSize % bytesPerChecksum; - } else { - assert blockReader.getDataChecksum().equals(checksum); - } - } - - /** - * @return true if there is valid target for reconstruction - */ - private boolean getTargetIndices() { - BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); - for (int i = 0; i < sources.length; i++) { - bitset.set(liveIndices[i]); - } - int m = 0; - int k = 0; - boolean hasValidTarget = false; - for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { - if (!bitset.get(i)) { - if (getBlockLen(blockGroup, i) > 0) { - if (m < targets.length) { - targetIndices[m++] = (short)i; - hasValidTarget = true; - } - } else { - zeroStripeIndices[k++] = (short)i; - } - } - } - return hasValidTarget; - } - - /** the reading length should not exceed the length for reconstruction. */ - private int getReadLength(int index, int reconstructLength) { - long blockLen = getBlockLen(blockGroup, index); - long remaining = blockLen - positionInBlock; - return (int) Math.min(remaining, reconstructLength); - } - - /** - * Read from minimum source DNs required for reconstruction in the iteration. - * First try the success list which we think they are the best DNs - * If source DN is corrupt or slow, try to read some other source DN, - * and will update the success list. - * - * Remember the updated success list and return it for following - * operations and next iteration read. - * - * @param success the initial success list of source DNs we think best - * @param reconstructLength the length to reconstruct. - * @return updated success list of source DNs we do real read - * @throws IOException - */ - private int[] readMinimumStripedData4Reconstruction(final int[] success, - int reconstructLength, CorruptedBlocks corruptedBlocks) - throws IOException { - Preconditions.checkArgument(reconstructLength >= 0 && - reconstructLength <= bufferSize); - int nsuccess = 0; - int[] newSuccess = new int[minRequiredSources]; - BitSet used = new BitSet(sources.length); - /* - * Read from minimum source DNs required, the success list contains - * source DNs which we think best. - */ - for (int i = 0; i < minRequiredSources; i++) { - StripedReader reader = stripedReaders.get(success[i]); - final int toRead = getReadLength(liveIndices[success[i]], - reconstructLength); - if (toRead > 0) { - Callable readCallable = readFromBlock(reader, reader.buffer, - toRead, corruptedBlocks); - Future f = readService.submit(readCallable); - futures.put(f, success[i]); - } else { - // If the read length is 0, we don't need to do real read - reader.buffer.position(0); - newSuccess[nsuccess++] = success[i]; - } - used.set(success[i]); - } - - while (!futures.isEmpty()) { - try { - StripingChunkReadResult result = StripedBlockUtil - .getNextCompletedStripedRead(readService, futures, - EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS); - int resultIndex = -1; - if (result.state == StripingChunkReadResult.SUCCESSFUL) { - resultIndex = result.index; - } else if (result.state == StripingChunkReadResult.FAILED) { - // If read failed for some source DN, we should not use it anymore - // and schedule read from another source DN. - StripedReader failedReader = stripedReaders.get(result.index); - IOUtils.closeStream(failedReader.blockReader); - failedReader.blockReader = null; - resultIndex = scheduleNewRead(used, reconstructLength, - corruptedBlocks); - } else if (result.state == StripingChunkReadResult.TIMEOUT) { - // If timeout, we also schedule a new read. - resultIndex = scheduleNewRead(used, reconstructLength, - corruptedBlocks); - } - if (resultIndex >= 0) { - newSuccess[nsuccess++] = resultIndex; - if (nsuccess >= minRequiredSources) { - // cancel remaining reads if we read successfully from minimum - // number of source DNs required by reconstruction. - cancelReads(futures.keySet()); - futures.clear(); - break; - } - } - } catch (InterruptedException e) { - LOG.info("Read data interrupted.", e); - cancelReads(futures.keySet()); - futures.clear(); - break; - } - } - - if (nsuccess < minRequiredSources) { - String error = "Can't read data from minimum number of sources " - + "required by reconstruction, block id: " + blockGroup.getBlockId(); - throw new IOException(error); - } - - return newSuccess; - } - - private void paddingBufferToLen(ByteBuffer buffer, int len) { - if (len > buffer.limit()) { - buffer.limit(len); - } - int toPadding = len - buffer.position(); - for (int i = 0; i < toPadding; i++) { - buffer.put((byte) 0); - } - } - - // Initialize decoder - private void initDecoderIfNecessary() { - if (decoder == null) { - decoder = newDecoder(dataBlkNum, parityBlkNum); - } - } - - private int[] getErasedIndices(boolean[] targetsStatus) { - int[] result = new int[targets.length]; - int m = 0; - for (int i = 0; i < targets.length; i++) { - if (targetsStatus[i]) { - result[m++] = targetIndices[i]; - } - } - return Arrays.copyOf(result, m); - } - - private void reconstructTargets(int[] success, boolean[] targetsStatus, - int toReconstructLen) { - initDecoderIfNecessary(); - ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; - for (int i = 0; i < success.length; i++) { - StripedReader reader = stripedReaders.get(success[i]); - ByteBuffer buffer = reader.buffer; - paddingBufferToLen(buffer, toReconstructLen); - inputs[reader.index] = (ByteBuffer)buffer.flip(); - } - if (success.length < dataBlkNum) { - for (int i = 0; i < zeroStripeBuffers.length; i++) { - ByteBuffer buffer = zeroStripeBuffers[i]; - paddingBufferToLen(buffer, toReconstructLen); - int index = zeroStripeIndices[i]; - inputs[index] = (ByteBuffer)buffer.flip(); - } - } - int[] erasedIndices = getErasedIndices(targetsStatus); - ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length]; - int m = 0; - for (int i = 0; i < targetBuffers.length; i++) { - if (targetsStatus[i]) { - targetBuffers[i].limit(toReconstructLen); - outputs[m++] = targetBuffers[i]; - } - } - decoder.decode(inputs, erasedIndices, outputs); - - for (int i = 0; i < targets.length; i++) { - if (targetsStatus[i]) { - long blockLen = getBlockLen(blockGroup, targetIndices[i]); - long remaining = blockLen - positionInBlock; - if (remaining <= 0) { - targetBuffers[i].limit(0); - } else if (remaining < toReconstructLen) { - targetBuffers[i].limit((int)remaining); - } - } - } - } - - /** - * Schedule a read from some new source DN if some DN is corrupted - * or slow, this is called from the read iteration. - * Initially we may only have minRequiredSources number of - * StripedReader. - * If the position is at the end of target block, don't need to do - * real read, and return the array index of source DN, otherwise -1. - * - * @param used the used source DNs in this iteration. - * @return the array index of source DN if don't need to do real read. - */ - private int scheduleNewRead(BitSet used, int reconstructLen, - CorruptedBlocks corruptedBlocks) { - StripedReader reader = null; - // step1: initially we may only have minRequiredSources - // number of StripedReader, and there may be some source DNs we never - // read before, so will try to create StripedReader for one new source DN - // and try to read from it. If found, go to step 3. - int m = stripedReaders.size(); - int toRead = 0; - while (reader == null && m < sources.length) { - reader = addStripedReader(m, positionInBlock); - toRead = getReadLength(liveIndices[m], reconstructLen); - if (toRead > 0) { - if (reader.blockReader == null) { - reader = null; - m++; - } - } else { - used.set(m); - return m; - } - } - - // step2: if there is no new source DN we can use, try to find a source - // DN we ever read from but because some reason, e.g., slow, it - // is not in the success DN list at the begin of this iteration, so - // we have not tried it in this iteration. Now we have a chance to - // revisit it again. - for (int i = 0; reader == null && i < stripedReaders.size(); i++) { - if (!used.get(i)) { - StripedReader r = stripedReaders.get(i); - toRead = getReadLength(liveIndices[i], reconstructLen); - if (toRead > 0) { - IOUtils.closeStream(r.blockReader); - r.blockReader = newBlockReader( - getBlock(blockGroup, liveIndices[i]), positionInBlock, - sources[i]); - if (r.blockReader != null) { - r.buffer.position(0); - m = i; - reader = r; - } - } else { - used.set(i); - r.buffer.position(0); - return i; - } - } - } - - // step3: schedule if find a correct source DN and need to do real read. - if (reader != null) { - Callable readCallable = readFromBlock(reader, reader.buffer, - toRead, corruptedBlocks); - Future f = readService.submit(readCallable); - futures.put(f, m); - used.set(m); - } - - return -1; - } - - // cancel all reads. - private void cancelReads(Collection> futures) { - for (Future future : futures) { - future.cancel(true); - } - } - - private Callable readFromBlock(final StripedReader reader, - final ByteBuffer buf, final int length, - final CorruptedBlocks corruptedBlocks) { - return new Callable() { - - @Override - public Void call() throws Exception { - try { - buf.limit(length); - actualReadFromBlock(reader.blockReader, buf); - return null; - } catch (ChecksumException e) { - LOG.warn("Found Checksum error for {} from {} at {}", reader.block, - reader.source, e.getPos()); - corruptedBlocks.addCorruptedBlock(reader.block, reader.source); - throw e; - } catch (IOException e) { - LOG.info(e.getMessage()); - throw e; - } - } - - }; - } - - /** - * Read bytes from block - */ - private void actualReadFromBlock(BlockReader reader, ByteBuffer buf) - throws IOException { - int len = buf.remaining(); - int n = 0; - while (n < len) { - int nread = reader.read(buf); - if (nread <= 0) { - break; - } - n += nread; - } - } - - private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { - return NetUtils.createSocketAddr(dnInfo.getXferAddr( - datanode.getDnConf().getConnectToDnViaHostname())); - } - - private BlockReader newBlockReader(final ExtendedBlock block, - long offsetInBlock, DatanodeInfo dnInfo) { - if (offsetInBlock >= block.getNumBytes()) { - return null; - } - try { - InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); - Token blockToken = datanode.getBlockAccessToken( - block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); - /* - * This can be further improved if the replica is local, then we can - * read directly from DN and need to check the replica is FINALIZED - * state, notice we should not use short-circuit local read which - * requires config for domain-socket in UNIX or legacy config in Windows. - * The network distance value isn't used for this scenario. - */ - return RemoteBlockReader2.newBlockReader( - "dummy", block, blockToken, offsetInBlock, - block.getNumBytes() - offsetInBlock, true, - "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, - null, cachingStrategy, datanode.getTracer(), -1); - } catch (IOException e) { - LOG.debug("Exception while creating remote block reader, datanode {}", - dnInfo, e); - return null; - } - } - - private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, - Token blockToken, DatanodeID datanodeId) - throws IOException { - Peer peer = null; - boolean success = false; - Socket sock = null; - final int socketTimeout = datanode.getDnConf().getSocketTimeout(); - try { - sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); - NetUtils.connect(sock, addr, socketTimeout); - peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(), - sock, datanode.getDataEncryptionKeyFactoryForBlock(b), - blockToken, datanodeId, socketTimeout); - success = true; - return peer; - } finally { - if (!success) { - IOUtils.cleanup(null, peer); - IOUtils.closeSocket(sock); - } - } - } - - /** - * Send data to targets - */ - private int transferData2Targets(boolean[] targetsStatus) { - int nsuccess = 0; - for (int i = 0; i < targets.length; i++) { - if (targetsStatus[i]) { - boolean success = false; - try { - ByteBuffer buffer = targetBuffers[i]; - - if (buffer.remaining() == 0) { - continue; - } - - checksum.calculateChunkedSums( - buffer.array(), 0, buffer.remaining(), checksumBuf, 0); - - int ckOff = 0; - while (buffer.remaining() > 0) { - DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket, - blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false); - int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum; - int toWrite = buffer.remaining() > maxBytesToPacket ? - maxBytesToPacket : buffer.remaining(); - int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize; - packet.writeChecksum(checksumBuf, ckOff, ckLen); - ckOff += ckLen; - packet.writeData(buffer, toWrite); - - // Send packet - packet.writeTo(targetOutputStreams[i]); - - blockOffset4Targets[i] += toWrite; - nsuccess++; - success = true; - } - } catch (IOException e) { - LOG.warn(e.getMessage()); - } - targetsStatus[i] = success; - } - } - return nsuccess; - } - - /** - * clear all buffers - */ - private void clearBuffers() { - for (StripedReader stripedReader : stripedReaders) { - if (stripedReader.buffer != null) { - stripedReader.buffer.clear(); - } - } - - if (zeroStripeBuffers != null) { - for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { - zeroStripeBuffer.clear(); - } - } - - for (ByteBuffer targetBuffer : targetBuffers) { - if (targetBuffer != null) { - targetBuffer.clear(); - } - } - } - - // send an empty packet to mark the end of the block - private void endTargetBlocks(boolean[] targetsStatus) { - for (int i = 0; i < targets.length; i++) { - if (targetsStatus[i]) { - try { - DFSPacket packet = new DFSPacket(packetBuf, 0, - blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true); - packet.writeTo(targetOutputStreams[i]); - targetOutputStreams[i].flush(); - } catch (IOException e) { - LOG.warn(e.getMessage()); - } - } - } - } - - /** - * Initialize output/input streams for transferring data to target - * and send create block request. - */ - private int initTargetStreams(boolean[] targetsStatus) { - int nsuccess = 0; - for (int i = 0; i < targets.length; i++) { - Socket socket = null; - DataOutputStream out = null; - DataInputStream in = null; - boolean success = false; - try { - InetSocketAddress targetAddr = - getSocketAddress4Transfer(targets[i]); - socket = datanode.newSocket(); - NetUtils.connect(socket, targetAddr, - datanode.getDnConf().getSocketTimeout()); - socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); - - ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); - Token blockToken = - datanode.getBlockAccessToken(block, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); - - long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); - OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(socket); - DataEncryptionKeyFactory keyFactory = - datanode.getDataEncryptionKeyFactoryForBlock(block); - IOStreamPair saslStreams = datanode.getSaslClient().socketSend( - socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); - - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(conf))); - in = new DataInputStream(unbufIn); - - DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); - new Sender(out).writeBlock(block, targetStorageTypes[i], - blockToken, "", new DatanodeInfo[]{targets[i]}, - new StorageType[]{targetStorageTypes[i]}, source, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, - checksum, cachingStrategy, false, false, null); - - targetSockets[i] = socket; - targetOutputStreams[i] = out; - targetInputStreams[i] = in; - nsuccess++; - success = true; - } catch (Throwable e) { - LOG.warn(e.getMessage()); - } finally { - if (!success) { - IOUtils.closeStream(out); - IOUtils.closeStream(in); - IOUtils.closeStream(socket); - } - } - targetsStatus[i] = success; - } - return nsuccess; - } + DataNode getDatanode() { + return datanode; } - private static class StripedReader { - private final short index; // internal block index - private BlockReader blockReader; - private ByteBuffer buffer; - private final ExtendedBlock block; - private final DatanodeInfo source; + Configuration getConf() { + return conf; + } - StripedReader(short index, ExtendedBlock block, DatanodeInfo source) { - this.index = index; - this.block = block; - this.source = source; - } + ThreadPoolExecutor getStripedReadPool() { + return stripedReadPool; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java new file mode 100644 index 0000000000..7f71bf76dd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.concurrent.Callable; + +/** + * StripedBlockReader is used to read block data from one source DN, it contains + * a block reader, read buffer and striped block index. + * Only allocate StripedBlockReader once for one source, and the StripedReader + * has the same array order with sources. Typically we only need to allocate + * minimum number (minRequiredSources) of StripedReader, and allocate + * new for new source DN if some existing DN invalid or slow. + * If some source DN is corrupt, set the corresponding blockReader to + * null and will never read from it again. + */ +@InterfaceAudience.Private +class StripedBlockReader { + private static final Logger LOG = DataNode.LOG; + + private StripedReader stripedReader; + private final DataNode datanode; + private final Configuration conf; + + private final short index; // internal block index + private final ExtendedBlock block; + private final DatanodeInfo source; + private BlockReader blockReader; + private ByteBuffer buffer; + + StripedBlockReader(StripedReader stripedReader, DataNode datanode, + Configuration conf, short index, ExtendedBlock block, + DatanodeInfo source, long offsetInBlock) { + this.stripedReader = stripedReader; + this.datanode = datanode; + this.conf = conf; + + this.index = index; + this.source = source; + this.block = block; + + BlockReader tmpBlockReader = createBlockReader(offsetInBlock); + if (tmpBlockReader != null) { + this.blockReader = tmpBlockReader; + } + } + + ByteBuffer getReadBuffer() { + if (buffer == null) { + this.buffer = stripedReader.allocateReadBuffer(); + } + return buffer; + } + + void resetBlockReader(long offsetInBlock) { + this.blockReader = createBlockReader(offsetInBlock); + } + + private BlockReader createBlockReader(long offsetInBlock) { + if (offsetInBlock >= block.getNumBytes()) { + return null; + } + try { + InetSocketAddress dnAddr = + stripedReader.getSocketAddress4Transfer(source); + Token blockToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); + /* + * This can be further improved if the replica is local, then we can + * read directly from DN and need to check the replica is FINALIZED + * state, notice we should not use short-circuit local read which + * requires config for domain-socket in UNIX or legacy config in + * Windows. The network distance value isn't used for this scenario. + * + * TODO: add proper tracer + */ + return RemoteBlockReader2.newBlockReader( + "dummy", block, blockToken, offsetInBlock, + block.getNumBytes() - offsetInBlock, true, + "", newConnectedPeer(block, dnAddr, blockToken, source), source, + null, stripedReader.getCachingStrategy(), datanode.getTracer(), -1); + } catch (IOException e) { + LOG.debug("Exception while creating remote block reader, datanode {}", + source, e); + return null; + } + } + + private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, + Token blockToken, + DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + final int socketTimeout = datanode.getDnConf().getSocketTimeout(); + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, socketTimeout); + peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(), + sock, datanode.getDataEncryptionKeyFactoryForBlock(b), + blockToken, datanodeId, socketTimeout); + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(null, peer); + IOUtils.closeSocket(sock); + } + } + } + + Callable readFromBlock(final int length, + final CorruptedBlocks corruptedBlocks) { + return new Callable() { + + @Override + public Void call() throws Exception { + try { + getReadBuffer().limit(length); + actualReadFromBlock(); + return null; + } catch (ChecksumException e) { + LOG.warn("Found Checksum error for {} from {} at {}", block, + source, e.getPos()); + corruptedBlocks.addCorruptedBlock(block, source); + throw e; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } + } + }; + } + + /** + * Perform actual reading of bytes from block. + */ + private void actualReadFromBlock() throws IOException { + int len = buffer.remaining(); + int n = 0; + while (n < len) { + int nread = blockReader.read(buffer); + if (nread <= 0) { + break; + } + n += nread; + } + } + + // close block reader + void closeBlockReader() { + IOUtils.closeStream(blockReader); + blockReader = null; + } + + short getIndex() { + return index; + } + + BlockReader getBlockReader() { + return blockReader; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java new file mode 100644 index 0000000000..a62f3c28c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSPacket; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +/** + * A striped block writer that writes reconstructed data to the remote target + * datanode. + */ +@InterfaceAudience.Private +class StripedBlockWriter { + private final StripedWriter stripedWriter; + private final DataNode datanode; + private final Configuration conf; + + private final ExtendedBlock block; + private final DatanodeInfo target; + private final StorageType storageType; + + private Socket targetSocket; + private DataOutputStream targetOutputStream; + private DataInputStream targetInputStream; + private ByteBuffer targetBuffer; + private long blockOffset4Target = 0; + private long seqNo4Target = 0; + + StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode, + Configuration conf, ExtendedBlock block, + DatanodeInfo target, StorageType storageType) + throws IOException { + this.stripedWriter = stripedWriter; + this.datanode = datanode; + this.conf = conf; + + this.block = block; + this.target = target; + this.storageType = storageType; + + this.targetBuffer = stripedWriter.allocateWriteBuffer(); + + init(); + } + + ByteBuffer getTargetBuffer() { + return targetBuffer; + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + private void init() throws IOException { + Socket socket = null; + DataOutputStream out = null; + DataInputStream in = null; + boolean success = false; + try { + InetSocketAddress targetAddr = + stripedWriter.getSocketAddress4Transfer(target); + socket = datanode.newSocket(); + NetUtils.connect(socket, targetAddr, + datanode.getDnConf().getSocketTimeout()); + socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); + + Token blockToken = + datanode.getBlockAccessToken(block, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); + OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(socket); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend( + socket, unbufOut, unbufIn, keyFactory, blockToken, target); + + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + DFSUtilClient.getSmallBufferSize(conf))); + in = new DataInputStream(unbufIn); + + DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); + new Sender(out).writeBlock(block, storageType, + blockToken, "", new DatanodeInfo[]{target}, + new StorageType[]{storageType}, source, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, + stripedWriter.getChecksum(), stripedWriter.getCachingStrategy(), + false, false, null); + + targetSocket = socket; + targetOutputStream = out; + targetInputStream = in; + success = true; + } finally { + if (!success) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeStream(socket); + } + } + } + + /** + * Send data to targets. + */ + void transferData2Target(byte[] packetBuf) throws IOException { + if (targetBuffer.remaining() == 0) { + return; + } + + stripedWriter.getChecksum().calculateChunkedSums( + targetBuffer.array(), 0, targetBuffer.remaining(), + stripedWriter.getChecksumBuf(), 0); + + int ckOff = 0; + while (targetBuffer.remaining() > 0) { + DFSPacket packet = new DFSPacket(packetBuf, + stripedWriter.getMaxChunksPerPacket(), + blockOffset4Target, seqNo4Target++, + stripedWriter.getChecksumSize(), false); + int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket() + * stripedWriter.getBytesPerChecksum(); + int toWrite = targetBuffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : targetBuffer.remaining(); + int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1) + * stripedWriter.getChecksumSize(); + packet.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen); + ckOff += ckLen; + packet.writeData(targetBuffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStream); + + blockOffset4Target += toWrite; + } + } + + // send an empty packet to mark the end of the block + void endTargetBlock(byte[] packetBuf) throws IOException { + DFSPacket packet = new DFSPacket(packetBuf, 0, + blockOffset4Target, seqNo4Target++, + stripedWriter.getChecksumSize(), true); + packet.writeTo(targetOutputStream); + targetOutputStream.flush(); + } + + void close() { + IOUtils.closeStream(targetOutputStream); + IOUtils.closeStream(targetInputStream); + IOUtils.closeStream(targetSocket); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java new file mode 100644 index 0000000000..fb7699a5cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Future; + +/** + * Manage striped readers that performs reading of block data from remote to + * serve input data for the erasure decoding. + */ +@InterfaceAudience.Private +class StripedReader { + private static final Logger LOG = DataNode.LOG; + + private final int stripedReadTimeoutInMills; + private final int stripedReadBufferSize; + + private StripedReconstructor reconstructor; + private final DataNode datanode; + private final Configuration conf; + + private final int dataBlkNum; + private final int parityBlkNum; + + + private DataChecksum checksum; + // Striped read buffer size + private int bufferSize; + private int[] successList; + + private final int minRequiredSources; + // The buffers and indices for striped blocks whose length is 0 + private ByteBuffer[] zeroStripeBuffers; + private short[] zeroStripeIndices; + + // sources + private final byte[] liveIndices; + private final DatanodeInfo[] sources; + + private final List readers; + + private final Map, Integer> futures = new HashMap<>(); + private final CompletionService readService; + + StripedReader(StripedReconstructor reconstructor, DataNode datanode, + Configuration conf, + BlockECReconstructionInfo reconstructionInfo) { + stripedReadTimeoutInMills = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + stripedReadBufferSize = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + this.reconstructor = reconstructor; + this.datanode = datanode; + this.conf = conf; + + ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy(); + dataBlkNum = ecPolicy.getNumDataUnits(); + parityBlkNum = ecPolicy.getNumParityUnits(); + + ExtendedBlock blockGroup = reconstructionInfo.getExtendedBlock(); + int cellsNum = (int)((blockGroup.getNumBytes() - 1) / ecPolicy.getCellSize() + + 1); + minRequiredSources = Math.min(cellsNum, dataBlkNum); + + if (minRequiredSources < dataBlkNum) { + int zeroStripNum = dataBlkNum - minRequiredSources; + zeroStripeBuffers = new ByteBuffer[zeroStripNum]; + zeroStripeIndices = new short[zeroStripNum]; + } + + liveIndices = reconstructionInfo.getLiveBlockIndices(); + sources = reconstructionInfo.getSourceDnInfos(); + + readers = new ArrayList<>(sources.length); + readService = reconstructor.createReadService(); + + Preconditions.checkArgument(liveIndices.length >= minRequiredSources, + "No enough live striped blocks."); + Preconditions.checkArgument(liveIndices.length == sources.length, + "liveBlockIndices and source datanodes should match"); + } + + void init() throws IOException { + initReaders(); + + initBufferSize(); + + initZeroStrip(); + } + + private void initReaders() throws IOException { + // Store the array indices of source DNs we have read successfully. + // In each iteration of read, the successList list may be updated if + // some source DN is corrupted or slow. And use the updated successList + // list of DNs for next iteration read. + successList = new int[minRequiredSources]; + + StripedBlockReader reader; + int nSuccess = 0; + for (int i = 0; i < sources.length && nSuccess < minRequiredSources; i++) { + reader = createReader(i, 0); + readers.add(reader); + if (reader.getBlockReader() != null) { + initOrVerifyChecksum(reader); + successList[nSuccess++] = i; + } + } + + if (nSuccess < minRequiredSources) { + String error = "Can't find minimum sources required by " + + "reconstruction, block id: " + + reconstructor.getBlockGroup().getBlockId(); + throw new IOException(error); + } + } + + StripedBlockReader createReader(int idxInSources, long offsetInBlock) { + return new StripedBlockReader(this, datanode, + conf, liveIndices[idxInSources], + reconstructor.getBlock(liveIndices[idxInSources]), + sources[idxInSources], offsetInBlock); + } + + private void initBufferSize() { + int bytesPerChecksum = checksum.getBytesPerChecksum(); + // The bufferSize is flat to divide bytesPerChecksum + int readBufferSize = stripedReadBufferSize; + bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : + readBufferSize - readBufferSize % bytesPerChecksum; + } + + // init checksum from block reader + private void initOrVerifyChecksum(StripedBlockReader reader) { + if (checksum == null) { + checksum = reader.getBlockReader().getDataChecksum(); + } else { + assert reader.getBlockReader().getDataChecksum().equals(checksum); + } + } + + protected ByteBuffer allocateReadBuffer() { + return ByteBuffer.allocate(getBufferSize()); + } + + private void initZeroStrip() { + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i] = reconstructor.allocateBuffer(bufferSize); + } + } + + BitSet bitset = reconstructor.getLiveBitSet(); + int k = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (reconstructor.getBlockLen(i) <= 0) { + zeroStripeIndices[k++] = (short)i; + } + } + } + } + + private int getReadLength(int index, int reconstructLength) { + // the reading length should not exceed the length for reconstruction + long blockLen = reconstructor.getBlockLen(index); + long remaining = blockLen - reconstructor.getPositionInBlock(); + return (int) Math.min(remaining, reconstructLength); + } + + ByteBuffer[] getInputBuffers(int toReconstructLen) { + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + + for (int i = 0; i < successList.length; i++) { + int index = successList[i]; + StripedBlockReader reader = getReader(index); + ByteBuffer buffer = reader.getReadBuffer(); + paddingBufferToLen(buffer, toReconstructLen); + inputs[reader.getIndex()] = (ByteBuffer)buffer.flip(); + } + + if (successList.length < dataBlkNum) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + ByteBuffer buffer = zeroStripeBuffers[i]; + paddingBufferToLen(buffer, toReconstructLen); + int index = zeroStripeIndices[i]; + inputs[index] = (ByteBuffer)buffer.flip(); + } + } + + return inputs; + } + + private void paddingBufferToLen(ByteBuffer buffer, int len) { + if (len > buffer.limit()) { + buffer.limit(len); + } + int toPadding = len - buffer.position(); + for (int i = 0; i < toPadding; i++) { + buffer.put((byte) 0); + } + } + + /** + * Read from minimum source DNs required for reconstruction in the iteration. + * First try the success list which we think they are the best DNs + * If source DN is corrupt or slow, try to read some other source DN, + * and will update the success list. + * + * Remember the updated success list and return it for following + * operations and next iteration read. + * + * @param reconstructLength the length to reconstruct. + * @return updated success list of source DNs we do real read + * @throws IOException + */ + void readMinimumSources(int reconstructLength) throws IOException { + CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); + try { + successList = doReadMinimumSources(reconstructLength, corruptedBlocks); + } finally { + // report corrupted blocks to NN + datanode.reportCorruptedBlocks(corruptedBlocks); + } + } + + int[] doReadMinimumSources(int reconstructLength, + CorruptedBlocks corruptedBlocks) + throws IOException { + Preconditions.checkArgument(reconstructLength >= 0 && + reconstructLength <= bufferSize); + int nSuccess = 0; + int[] newSuccess = new int[minRequiredSources]; + BitSet usedFlag = new BitSet(sources.length); + /* + * Read from minimum source DNs required, the success list contains + * source DNs which we think best. + */ + for (int i = 0; i < minRequiredSources; i++) { + StripedBlockReader reader = readers.get(successList[i]); + int toRead = getReadLength(liveIndices[successList[i]], + reconstructLength); + if (toRead > 0) { + Callable readCallable = + reader.readFromBlock(toRead, corruptedBlocks); + Future f = readService.submit(readCallable); + futures.put(f, successList[i]); + } else { + // If the read length is 0, we don't need to do real read + reader.getReadBuffer().position(0); + newSuccess[nSuccess++] = successList[i]; + } + usedFlag.set(successList[i]); + } + + while (!futures.isEmpty()) { + try { + StripingChunkReadResult result = + StripedBlockUtil.getNextCompletedStripedRead( + readService, futures, stripedReadTimeoutInMills); + int resultIndex = -1; + if (result.state == StripingChunkReadResult.SUCCESSFUL) { + resultIndex = result.index; + } else if (result.state == StripingChunkReadResult.FAILED) { + // If read failed for some source DN, we should not use it anymore + // and schedule read from another source DN. + StripedBlockReader failedReader = readers.get(result.index); + failedReader.closeBlockReader(); + resultIndex = scheduleNewRead(usedFlag, + reconstructLength, corruptedBlocks); + } else if (result.state == StripingChunkReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + resultIndex = scheduleNewRead(usedFlag, + reconstructLength, corruptedBlocks); + } + if (resultIndex >= 0) { + newSuccess[nSuccess++] = resultIndex; + if (nSuccess >= minRequiredSources) { + // cancel remaining reads if we read successfully from minimum + // number of source DNs required by reconstruction. + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } + } catch (InterruptedException e) { + LOG.info("Read data interrupted.", e); + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } + + if (nSuccess < minRequiredSources) { + String error = "Can't read data from minimum number of sources " + + "required by reconstruction, block id: " + + reconstructor.getBlockGroup().getBlockId(); + throw new IOException(error); + } + + return newSuccess; + } + + /** + * Schedule a read from some new source DN if some DN is corrupted + * or slow, this is called from the read iteration. + * Initially we may only have minRequiredSources number of + * StripedBlockReader. + * If the position is at the end of target block, don't need to do + * real read, and return the array index of source DN, otherwise -1. + * + * @param used the used source DNs in this iteration. + * @return the array index of source DN if don't need to do real read. + */ + private int scheduleNewRead(BitSet used, int reconstructLength, + CorruptedBlocks corruptedBlocks) { + StripedBlockReader reader = null; + // step1: initially we may only have minRequiredSources + // number of StripedBlockReader, and there may be some source DNs we never + // read before, so will try to create StripedBlockReader for one new source + // DN and try to read from it. If found, go to step 3. + int m = readers.size(); + int toRead = 0; + while (reader == null && m < sources.length) { + reader = createReader(m, reconstructor.getPositionInBlock()); + readers.add(reader); + toRead = getReadLength(liveIndices[m], reconstructLength); + if (toRead > 0) { + if (reader.getBlockReader() == null) { + reader = null; + m++; + } + } else { + used.set(m); + return m; + } + } + + // step2: if there is no new source DN we can use, try to find a source + // DN we ever read from but because some reason, e.g., slow, it + // is not in the success DN list at the begin of this iteration, so + // we have not tried it in this iteration. Now we have a chance to + // revisit it again. + for (int i = 0; reader == null && i < readers.size(); i++) { + if (!used.get(i)) { + StripedBlockReader stripedReader = readers.get(i); + toRead = getReadLength(liveIndices[i], reconstructLength); + if (toRead > 0) { + stripedReader.closeBlockReader(); + stripedReader.resetBlockReader(reconstructor.getPositionInBlock()); + if (stripedReader.getBlockReader() != null) { + stripedReader.getReadBuffer().position(0); + m = i; + reader = stripedReader; + } + } else { + used.set(i); + stripedReader.getReadBuffer().position(0); + return i; + } + } + } + + // step3: schedule if find a correct source DN and need to do real read. + if (reader != null) { + Callable readCallable = + reader.readFromBlock(toRead, corruptedBlocks); + Future f = readService.submit(readCallable); + futures.put(f, m); + used.set(m); + } + + return -1; + } + + // Cancel all reads. + private static void cancelReads(Collection> futures) { + for (Future future : futures) { + future.cancel(true); + } + } + + void close() { + for (StripedBlockReader reader : readers) { + reader.closeBlockReader(); + } + } + + StripedBlockReader getReader(int i) { + return readers.get(i); + } + + int getBufferSize() { + return bufferSize; + } + + DataChecksum getChecksum() { + return checksum; + } + + void clearBuffers() { + if (zeroStripeBuffers != null) { + for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { + zeroStripeBuffer.clear(); + } + } + + for (StripedBlockReader reader : readers) { + if (reader.getReadBuffer() != null) { + reader.getReadBuffer().clear(); + } + } + } + + InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return reconstructor.getSocketAddress4Transfer(dnInfo); + } + + CachingStrategy getCachingStrategy() { + return reconstructor.getCachingStrategy(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java new file mode 100644 index 0000000000..a0a5f836b6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; + +/** + * StripedReconstructor reconstruct one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to reconstruct striped block group, in each round, we + * reconstruct bufferSize data until finish, the + * bufferSize is configurable and may be less or larger than + * cell size: + * step1: read bufferSize data from minimum number of sources + * required by reconstruction. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read bufferSize data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and reconstruct all missed striped block + * if they are more than one. + * + * In step3, send the reconstructed data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the reconstruction work + * are one of the source datanodes, so the reconstructed data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC reconstruction? Since EC + * reconstruction is more expensive than continuous block replication, + * it needs to read from several other datanodes, should we make sure the + * reconstructed result received by targets? + */ +@InterfaceAudience.Private +class StripedReconstructor implements Runnable { + private static final Logger LOG = DataNode.LOG; + + private final ErasureCodingWorker worker; + private final DataNode datanode; + private final Configuration conf; + + private final ErasureCodingPolicy ecPolicy; + + private RawErasureDecoder decoder; + + private final ExtendedBlock blockGroup; + private final BitSet liveBitSet; + + // position in striped internal block + private long positionInBlock; + + private StripedReader stripedReader; + + private StripedWriter stripedWriter; + + private final CachingStrategy cachingStrategy; + + StripedReconstructor(ErasureCodingWorker worker, + BlockECReconstructionInfo reconstructionInfo) { + this.worker = worker; + this.datanode = worker.getDatanode(); + this.conf = worker.getConf(); + + ecPolicy = reconstructionInfo.getErasureCodingPolicy(); + + blockGroup = reconstructionInfo.getExtendedBlock(); + byte[] liveIndices = reconstructionInfo.getLiveBlockIndices(); + liveBitSet = new BitSet(ecPolicy.getNumDataUnits() + + ecPolicy.getNumParityUnits()); + for (int i = 0; i < liveIndices.length; i++) { + liveBitSet.set(liveIndices[i]); + } + + stripedReader = new StripedReader(this, datanode, + conf, reconstructionInfo); + stripedWriter = new StripedWriter(this, datanode, + conf, reconstructionInfo); + + cachingStrategy = CachingStrategy.newDefaultStrategy(); + + positionInBlock = 0L; + } + + BitSet getLiveBitSet() { + return liveBitSet; + } + + ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + + ExtendedBlock getBlock(int i) { + return StripedBlockUtil.constructInternalBlock(blockGroup, ecPolicy, i); + } + + long getBlockLen(int i) { + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), + ecPolicy, i); + } + + boolean hasValidTargets() { + return stripedWriter.hasValidTargets(); + } + + @Override + public void run() { + datanode.incrementXmitsInProgress(); + try { + stripedReader.init(); + + stripedWriter.init(); + + reconstructAndTransfer(); + + stripedWriter.endTargetBlocks(); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e); + } finally { + datanode.decrementXmitsInProgress(); + + stripedReader.close(); + + stripedWriter.close(); + } + } + + void reconstructAndTransfer() throws IOException { + while (positionInBlock < stripedWriter.getMaxTargetLength()) { + long remaining = stripedWriter.getMaxTargetLength() - positionInBlock; + final int toReconstructLen = + (int) Math.min(stripedReader.getBufferSize(), remaining); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + stripedReader.readMinimumSources(toReconstructLen); + + // step2: decode to reconstruct targets + reconstructTargets(toReconstructLen); + + // step3: transfer data + if (stripedWriter.transferData2Targets() == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + positionInBlock += toReconstructLen; + + clearBuffers(); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = CodecUtil.createRSRawDecoder(conf, ecPolicy.getNumDataUnits(), + ecPolicy.getNumParityUnits()); + } + } + + private void reconstructTargets(int toReconstructLen) { + initDecoderIfNecessary(); + + ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen); + + int[] erasedIndices = stripedWriter.getRealTargetIndices(); + ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); + + decoder.decode(inputs, erasedIndices, outputs); + + stripedWriter.updateRealTargetBuffers(toReconstructLen); + } + + long getPositionInBlock() { + return positionInBlock; + } + + /** + * Clear all associated buffers. + */ + private void clearBuffers() { + stripedReader.clearBuffers(); + + stripedWriter.clearBuffers(); + } + + InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + int getBufferSize() { + return stripedReader.getBufferSize(); + } + + DataChecksum getChecksum() { + return stripedReader.getChecksum(); + } + + CachingStrategy getCachingStrategy() { + return cachingStrategy; + } + + CompletionService createReadService() { + return new ExecutorCompletionService<>(worker.getStripedReadPool()); + } + + ExtendedBlock getBlockGroup() { + return blockGroup; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java new file mode 100644 index 0000000000..e2052a3970 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.BitSet; + +/** + * Manage striped writers that writes to a target with reconstructed data. + */ +@InterfaceAudience.Private +class StripedWriter { + private static final Logger LOG = DataNode.LOG; + private final static int WRITE_PACKET_SIZE = 64 * 1024; + + private final StripedReconstructor reconstructor; + private final DataNode datanode; + private final Configuration conf; + + private final int dataBlkNum; + private final int parityBlkNum; + + private boolean[] targetsStatus; + + // targets + private final DatanodeInfo[] targets; + private final short[] targetIndices; + private boolean hasValidTargets; + private final StorageType[] targetStorageTypes; + private long maxTargetLength; + + private StripedBlockWriter[] writers; + + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + StripedWriter(StripedReconstructor reconstructor, + DataNode datanode, + Configuration conf, + BlockECReconstructionInfo reconstructionInfo) { + this.reconstructor = reconstructor; + this.datanode = datanode; + this.conf = conf; + + ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy(); + dataBlkNum = ecPolicy.getNumDataUnits(); + parityBlkNum = ecPolicy.getNumParityUnits(); + + targets = reconstructionInfo.getTargetDnInfos(); + targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); + + writers = new StripedBlockWriter[targets.length]; + + targetIndices = new short[targets.length]; + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + initTargetIndices(); + + maxTargetLength = 0L; + for (short targetIndex : targetIndices) { + maxTargetLength = Math.max(maxTargetLength, + reconstructor.getBlockLen(targetIndex)); + } + + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. + targetsStatus = new boolean[targets.length]; + } + + void init() throws IOException { + DataChecksum checksum = reconstructor.getChecksum(); + checksumSize = checksum.getChecksumSize(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + int tmpLen = checksumSize * + (reconstructor.getBufferSize() / bytesPerChecksum); + checksumBuf = new byte[tmpLen]; + + if (initTargetStreams() == 0) { + String error = "All targets are failed."; + throw new IOException(error); + } + } + + private void initTargetIndices() { + BitSet bitset = reconstructor.getLiveBitSet(); + + int m = 0; + int k = 0; + hasValidTargets = false; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (reconstructor.getBlockLen(i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + hasValidTargets = true; + } + } + } + } + } + + /** + * Send reconstructed data to targets. + */ + int transferData2Targets() { + int nSuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + writers[i].transferData2Target(packetBuf); + nSuccess++; + success = true; + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nSuccess; + } + + /** + * Send an empty packet to mark the end of the block. + */ + void endTargetBlocks() { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + writers[i].endTargetBlock(packetBuf); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + int initTargetStreams() { + int nSuccess = 0; + for (short i = 0; i < targets.length; i++) { + try { + writers[i] = createWriter(i); + nSuccess++; + targetsStatus[i] = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } + } + return nSuccess; + } + + private StripedBlockWriter createWriter(short index) throws IOException { + return new StripedBlockWriter(this, datanode, conf, + reconstructor.getBlock(targetIndices[index]), targets[index], + targetStorageTypes[index]); + } + + ByteBuffer allocateWriteBuffer() { + return reconstructor.allocateBuffer(reconstructor.getBufferSize()); + } + + int getTargets() { + return targets.length; + } + + private int getRealTargets() { + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + m++; + } + } + return m; + } + + int[] getRealTargetIndices() { + int realTargets = getRealTargets(); + int[] results = new int[realTargets]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + results[m++] = targetIndices[i]; + } + } + return results; + } + + ByteBuffer[] getRealTargetBuffers(int toReconstructLen) { + int numGood = getRealTargets(); + ByteBuffer[] outputs = new ByteBuffer[numGood]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + writers[i].getTargetBuffer().limit(toReconstructLen); + outputs[m++] = writers[i].getTargetBuffer(); + } + } + return outputs; + } + + void updateRealTargetBuffers(int toReconstructLen) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = reconstructor.getBlockLen(targetIndices[i]); + long remaining = blockLen - reconstructor.getPositionInBlock(); + if (remaining <= 0) { + writers[i].getTargetBuffer().limit(0); + } else if (remaining < toReconstructLen) { + writers[i].getTargetBuffer().limit((int)remaining); + } + } + } + } + + long getMaxTargetLength() { + return maxTargetLength; + } + + byte[] getChecksumBuf() { + return checksumBuf; + } + + int getBytesPerChecksum() { + return bytesPerChecksum; + } + + int getChecksumSize() { + return checksumSize; + } + + DataChecksum getChecksum() { + return reconstructor.getChecksum(); + } + + int getMaxChunksPerPacket() { + return maxChunksPerPacket; + } + + CachingStrategy getCachingStrategy() { + return reconstructor.getCachingStrategy(); + } + + InetSocketAddress getSocketAddress4Transfer(DatanodeInfo target) { + return reconstructor.getSocketAddress4Transfer(target); + } + + boolean hasValidTargets() { + return hasValidTargets; + } + + /** + * Clear all buffers. + */ + void clearBuffers() { + for (StripedBlockWriter writer : writers) { + ByteBuffer targetBuffer = writer.getTargetBuffer(); + if (targetBuffer != null) { + targetBuffer.clear(); + } + } + } + + void close() { + for (int i = 0; i < targets.length; i++) { + writers[i].close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java new file mode 100644 index 0000000000..3150fcedb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Datanode side striping + erasure coding related task processing. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.datanode.erasurecode; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 38ca8ceb8c..7155e74463 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -230,22 +230,23 @@ private void shutdownDataNode(DataNode dn) throws IOException { private int generateErrors(Map corruptTargets, ReconstructionType type) throws IOException { - int stoppedDN = 0; - for (Map.Entry target : corruptTargets.entrySet()) { - if (stoppedDN == 0 || type != ReconstructionType.DataOnly + int stoppedDNs = 0; + for (Map.Entry target : + corruptTargets.entrySet()) { + if (stoppedDNs == 0 || type != ReconstructionType.DataOnly || random.nextBoolean()) { // stop at least one DN to trigger reconstruction LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); shutdownDataNode(target.getValue()); - stoppedDN++; + stoppedDNs++; } else { // corrupt the data on the DN LOG.info("Note: corrupt data on " + target.getValue().getDisplayName() + " with internal block " + target.getKey()); cluster.corruptReplica(target.getValue(), target.getKey()); } } - return stoppedDN; + return stoppedDNs; } /**