From 91c741a2a171129638071306482c019d007972ab Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 7 Apr 2015 11:20:13 -0700 Subject: [PATCH] HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao --- .../hadoop/hdfs/protocol/LocatedBlock.java | 4 + .../org/apache/hadoop/hdfs/DFSClient.java | 55 +++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +- .../apache/hadoop/hdfs/DFSInputStream.java | 80 +++- .../hadoop/hdfs/DFSStripedInputStream.java | 367 ++++++++++++++++++ .../hdfs/protocol/LocatedStripedBlock.java | 5 + .../blockmanagement/BlockInfoStriped.java | 6 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 91 ++++- .../hadoop/hdfs/TestReadStripedFile.java | 304 +++++++++++++++ .../namenode/TestRecoverStripedBlocks.java | 88 +---- 10 files changed, 896 insertions(+), 112 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 4e8f2025be..a9596bf67d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -203,4 +203,8 @@ public String toString() { + "; locs=" + Arrays.asList(locs) + "}"; } + + public boolean isStriped() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0c6383b4dc..a845fdfbf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -238,6 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = new DFSHedgedReadMetrics(); private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; + private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final Sampler traceSampler; private final int smallBufferSize; @@ -380,6 +381,19 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); } + numThreads = conf.getInt( + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE, + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); + if (numThreads <= 0) { + LOG.warn("The value of " + + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE + + " must be greater than 0. The current setting is " + numThreads + + ". Reset it to the default value " + + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); + numThreads = + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE; + } + this.initThreadsNumForStripedReads(numThreads); this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); @@ -3193,11 +3207,52 @@ public void rejectedExecution(Runnable runnable, LOG.debug("Using hedged reads; pool threads=" + num); } } + + /** + * Create thread pool for parallel reading in striped layout, + * STRIPED_READ_THREAD_POOL, if it does not already exist. + * @param num Number of threads for striped reads thread pool. + */ + private void initThreadsNumForStripedReads(int num) { + assert num > 0; + if (STRIPED_READ_THREAD_POOL != null) { + return; + } + synchronized (DFSClient.class) { + if (STRIPED_READ_THREAD_POOL == null) { + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + } + } ThreadPoolExecutor getHedgedReadsThreadPool() { return HEDGED_READ_THREAD_POOL; } + ThreadPoolExecutor getStripedReadsThreadPool() { + return STRIPED_READ_THREAD_POOL; + } + boolean isHedgedReadsEnabled() { return (HEDGED_READ_THREAD_POOL != null) && HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bd86964a17..6bc005bd14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -670,7 +670,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.reject-unresolved-dn-topology-mapping"; public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT = false; - + + public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE = + "dfs.client.striped.read.threadpool.size"; + // With default 3+2 schema, each normal read could span 3 DNs. So this + // default value accommodates 6 read streams + public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18; + // Slow io warning log threshold settings for dfsclient and datanode. public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.datanode.slow.io.warning.threshold.ms"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index d1e0b9a593..72725c43ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; @@ -94,7 +95,7 @@ public class DFSInputStream extends FSInputStream @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; - private final DFSClient dfsClient; + protected final DFSClient dfsClient; private AtomicBoolean closed = new AtomicBoolean(false); private final String src; private final boolean verifyChecksum; @@ -441,7 +442,7 @@ public List getAllBlocks() throws IOException { * @return located block * @throws IOException */ - private LocatedBlock getBlockAt(long offset) throws IOException { + protected LocatedBlock getBlockAt(long offset) throws IOException { synchronized(infoLock) { assert (locatedBlocks != null) : "locatedBlocks is null"; @@ -713,7 +714,7 @@ public synchronized int read() throws IOException { * Wraps different possible read implementations so that readBuffer can be * strategy-agnostic. */ - private interface ReaderStrategy { + interface ReaderStrategy { public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException; } @@ -1058,7 +1059,7 @@ private static String getBestNodeDNAddrPairErrorString( return errMsgr.toString(); } - private void fetchBlockByteRange(long blockStartOffset, long start, long end, + protected void fetchBlockByteRange(long blockStartOffset, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { @@ -1100,13 +1101,42 @@ public ByteBuffer call() throws Exception { }; } + /** + * Used when reading contiguous blocks + */ private void actualGetFromOneDataNode(final DNAddrPair datanode, long blockStartOffset, final long start, final long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { + final int length = (int) (end - start + 1); + actualGetFromOneDataNode(datanode, block, start, end, buf, + new int[]{offset}, new int[]{length}, corruptedBlockMap); + } + + /** + * Read data from one DataNode. + * @param datanode the datanode from which to read data + * @param block the block to read + * @param startInBlk the startInBlk offset of the block + * @param endInBlk the endInBlk offset of the block + * @param buf the given byte array into which the data is read + * @param offsets the data may be read into multiple segments of the buf + * (when reading a striped block). this array indicates the + * offset of each buf segment. + * @param lengths the length of each buf segment + * @param corruptedBlockMap map recording list of datanodes with corrupted + * block replica + */ + void actualGetFromOneDataNode(final DNAddrPair datanode, + LocatedBlock block, final long startInBlk, final long endInBlk, + byte[] buf, int[] offsets, int[] lengths, + Map> corruptedBlockMap) + throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once + final int len = (int) (endInBlk - startInBlk + 1); + checkReadPortions(offsets, lengths, len); while (true) { // cached block locations may have been updated by chooseDataNode() @@ -1116,15 +1146,15 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, BlockReader reader = null; try { DFSClientFaultInjector.get().fetchFromDatanodeException(); - int len = (int) (end - start + 1); reader = getBlockReader(block, start, len, datanode.addr, datanode.storageType, datanode.info); - int nread = reader.readAll(buf, offset, len); - updateReadStatistics(readStatistics, nread, reader); - - if (nread != len) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + len + ", got " + nread); + for (int i = 0; i < offsets.length; i++) { + int nread = reader.readAll(buf, offsets[i], lengths[i]); + updateReadStatistics(readStatistics, nread, reader); + if (nread != len) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + len + ", got " + nread); + } } DFSClientFaultInjector.get().readFromDatanodeDelay(); return; @@ -1169,7 +1199,26 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode, } /** - * Like {@link #fetchBlockByteRange} except we start up a second, parallel, + * This method verifies that the read portions are valid and do not overlap + * with each other. + */ + private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { + Preconditions.checkArgument(offsets.length == lengths.length && + offsets.length > 0); + int sum = 0; + for (int i = 0; i < lengths.length; i++) { + if (i > 0) { + int gap = offsets[i] - offsets[i - 1]; + // make sure read portions do not overlap with each other + Preconditions.checkArgument(gap >= lengths[i - 1]); + } + sum += lengths[i]; + } + Preconditions.checkArgument(sum == totalLen); + } + + /** + * Like {@link #fetchBlockByteRange}except we start up a second, parallel, * 'hedged' read if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first. */ @@ -1388,10 +1437,9 @@ private int pread(long position, byte[] buffer, int offset, int length) long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { - if (dfsClient.isHedgedReadsEnabled()) { + if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart, - targetStart + bytesToRead - 1, buffer, offset, - corruptedBlockMap); + targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); } else { fetchBlockByteRange(blk.getStartOffset(), targetStart, targetStart + bytesToRead - 1, buffer, offset, @@ -1587,7 +1635,7 @@ public void reset() throws IOException { } /** Utility class to encapsulate data node info and its address. */ - private static final class DNAddrPair { + static final class DNAddrPair { final DatanodeInfo info; final InetSocketAddress addr; final StorageType storageType; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java new file mode 100644 index 0000000000..077b0f8554 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; +import org.apache.hadoop.net.NetUtils; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + + +/****************************************************************************** + * DFSStripedInputStream reads from striped block groups, illustrated below: + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2 <- A striped block group has + * | | | {@link #groupSize} blocks + * v v v + * +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| <- The logical read order should be + * +------+ +------+ +------+ cell_0, cell_1, ... + * |cell_3| |cell_4| |cell_5| + * +------+ +------+ +------+ + * |cell_6| |cell_7| |cell_8| + * +------+ +------+ +------+ + * |cell_9| + * +------+ <- A cell contains {@link #cellSize} bytes of data + * + * Three styles of read will eventually be supported: + * 1. Stateful read: TODO: HDFS-8033 + * 2. pread without decode support + * This is implemented by calculating the portion of read from each block and + * issuing requests to each DataNode in parallel. + * 3. pread with decode support: TODO: will be supported after HDFS-7678 + *****************************************************************************/ +public class DFSStripedInputStream extends DFSInputStream { + /** + * This method plans the read portion from each block in the stripe + * @param groupSize The size / width of the striping group + * @param cellSize The size of each striping cell + * @param startInBlk Starting offset in the striped block + * @param len Length of the read request + * @param bufOffset Initial offset in the result buffer + * @return array of {@link ReadPortion}, each representing the portion of I/O + * for an individual block in the group + */ + @VisibleForTesting + static ReadPortion[] planReadPortions(final int groupSize, + final int cellSize, final long startInBlk, final int len, int bufOffset) { + ReadPortion[] results = new ReadPortion[groupSize]; + for (int i = 0; i < groupSize; i++) { + results[i] = new ReadPortion(); + } + + // cellIdxInBlk is the index of the cell in the block + // E.g., cell_3 is the 2nd cell in blk_0 + int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize)); + + // blkIdxInGroup is the index of the block in the striped block group + // E.g., blk_2 is the 3rd block in the group + final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize); + results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk + + startInBlk % cellSize; + boolean crossStripe = false; + for (int i = 1; i < groupSize; i++) { + if (blkIdxInGroup + i >= groupSize && !crossStripe) { + cellIdxInBlk++; + crossStripe = true; + } + results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock = + cellSize * cellIdxInBlk; + } + + int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len); + results[blkIdxInGroup].offsetsInBuf.add(bufOffset); + results[blkIdxInGroup].lengths.add(firstCellLen); + results[blkIdxInGroup].readLength += firstCellLen; + + int i = (blkIdxInGroup + 1) % groupSize; + for (int done = firstCellLen; done < len; done += cellSize) { + ReadPortion rp = results[i]; + rp.offsetsInBuf.add(done + bufOffset); + final int readLen = Math.min(len - done, cellSize); + rp.lengths.add(readLen); + rp.readLength += readLen; + i = (i + 1) % groupSize; + } + return results; + } + + /** + * This method parses a striped block group into individual blocks. + * + * @param bg The striped block group + * @param dataBlkNum the number of data blocks + * @return An array containing the blocks in the group + */ + @VisibleForTesting + static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, + int dataBlkNum, int cellSize) { + int locatedBGSize = bg.getBlockIndices().length; + // TODO not considering missing blocks for now, only identify data blocks + LocatedBlock[] lbs = new LocatedBlock[dataBlkNum]; + for (short i = 0; i < locatedBGSize; i++) { + final int idx = bg.getBlockIndices()[i]; + if (idx < dataBlkNum && lbs[idx] == null) { + lbs[idx] = constructInternalBlock(bg, i, cellSize, idx); + } + } + return lbs; + } + + private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, + int idxInReturnedLocs, int cellSize, int idxInBlockGroup) { + final ExtendedBlock blk = new ExtendedBlock(bg.getBlock()); + blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup); + // TODO: fix the numBytes computation + + return new LocatedBlock(blk, + new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, + new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, + new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, + bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), + null); + } + + + private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; + + DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum) + throws IOException { + super(dfsClient, src, verifyChecksum); + DFSClient.LOG.debug("Creating an striped input stream for file " + src); + } + + @Override + public synchronized int read(final ByteBuffer buf) throws IOException { + throw new UnsupportedActionException("Stateful read is not supported"); + } + + @Override + public synchronized int read(final byte buf[], int off, int len) + throws IOException { + throw new UnsupportedActionException("Stateful read is not supported"); + } + + /** + * | <--------- LocatedStripedBlock (ID = 0) ---------> | + * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2) + * ^ + * offset + * On a striped file, the super method {@link DFSInputStream#getBlockAt} + * treats a striped block group as a single {@link LocatedBlock} object, + * which includes target in its range. This method adds the logic of: + * 1. Analyzing the index of required block based on offset + * 2. Parsing the block group to obtain the block location on that index + */ + @Override + protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException { + LocatedBlock lb = super.getBlockAt(blkStartOffset); + assert lb instanceof LocatedStripedBlock : "NameNode should return a " + + "LocatedStripedBlock for a striped file"; + + int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize) + % groupSize); + // If indexing information is returned, iterate through the index array + // to find the entry for position idx in the group + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + int i = 0; + for (; i < lsb.getBlockIndices().length; i++) { + if (lsb.getBlockIndices()[i] == idx) { + break; + } + } + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("getBlockAt for striped blocks, offset=" + + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx); + } + return constructInternalBlock(lsb, i, cellSize, idx); + } + + private LocatedBlock getBlockGroupAt(long offset) throws IOException { + return super.getBlockAt(offset); + } + + /** + * Real implementation of pread. + */ + @Override + protected void fetchBlockByteRange(LocatedBlock block, long start, + long end, byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { + Map, Integer> futures = new HashMap<>(); + CompletionService stripedReadsService = + new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); + int len = (int) (end - start + 1); + + // Refresh the striped block group + block = getBlockGroupAt(block.getStartOffset()); + assert block instanceof LocatedStripedBlock : "NameNode" + + " should return a LocatedStripedBlock for a striped file"; + LocatedStripedBlock blockGroup = (LocatedStripedBlock) block; + + // Planning the portion of I/O for each shard + ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start, + len, offset); + + // Parse group to get chosen DN location + LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize); + + for (short i = 0; i < groupSize; i++) { + ReadPortion rp = readPortions[i]; + if (rp.readLength <= 0) { + continue; + } + DatanodeInfo loc = blks[i].getLocations()[0]; + StorageType type = blks[i].getStorageTypes()[0]; + DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( + loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type); + Callable readCallable = getFromOneDataNode(dnAddr, blks[i], + rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf, + rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i); + Future getFromDNRequest = stripedReadsService.submit(readCallable); + DFSClient.LOG.debug("Submitting striped read request for " + blks[i]); + futures.put(getFromDNRequest, (int) i); + } + while (!futures.isEmpty()) { + try { + waitNextCompletion(stripedReadsService, futures); + } catch (InterruptedException ie) { + // Ignore and retry + } + } + } + + private Callable getFromOneDataNode(final DNAddrPair datanode, + final LocatedBlock block, final long start, final long end, + final byte[] buf, final int[] offsets, final int[] lengths, + final Map> corruptedBlockMap, + final int hedgedReadId) { + final Span parentSpan = Trace.currentSpan(); + return new Callable() { + @Override + public Void call() throws Exception { + TraceScope scope = + Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan); + try { + actualGetFromOneDataNode(datanode, block, start, + end, buf, offsets, lengths, corruptedBlockMap); + } finally { + scope.close(); + } + return null; + } + }; + } + + private void waitNextCompletion(CompletionService stripedReadsService, + Map, Integer> futures) throws InterruptedException { + if (futures.isEmpty()) { + throw new InterruptedException("Futures already empty"); + } + Future future = null; + try { + future = stripedReadsService.take(); + future.get(); + futures.remove(future); + } catch (ExecutionException | CancellationException e) { + // already logged in the Callable + futures.remove(future); + } + throw new InterruptedException("let's retry"); + } + + public void setCellSize(int cellSize) { + this.cellSize = cellSize; + } + + /** + * This class represents the portion of I/O associated with each block in the + * striped block group. + */ + static class ReadPortion { + /** + * startOffsetInBlock + * | + * v + * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->| + * +------------------+------------------+----------------+ + * | cell_0 | cell_3 | cell_6 | <- blk_0 + * +------------------+------------------+----------------+ + * _/ \_______________________ + * | | + * v offsetsInBuf[0] v offsetsInBuf[1] + * +------------------------------------------------------+ + * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf + * | (partial) | (from blk_1 and blk_2) | | + * +------------------------------------------------------+ + */ + private long startOffsetInBlock = 0; + private long readLength = 0; + private final List offsetsInBuf = new ArrayList<>(); + private final List lengths = new ArrayList<>(); + + int[] getOffsets() { + int[] offsets = new int[offsetsInBuf.size()]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = offsetsInBuf.get(i); + } + return offsets; + } + + int[] getLengths() { + int[] lens = new int[this.lengths.size()]; + for (int i = 0; i < lens.length; i++) { + lens[i] = this.lengths.get(i); + } + return lens; + } + + long getReadLength() { + return readLength; + } + + long getStartOffsetInBlock() { + return startOffsetInBlock; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java index 97e3a6936f..98614db446 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -65,4 +65,9 @@ public String toString() { public int[] getBlockIndices() { return this.blockIndices; } + + @Override + public boolean isStriped() { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 4a85efbcbc..20b0c5c4a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -23,7 +23,7 @@ import java.io.DataOutput; import java.io.IOException; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. @@ -203,8 +203,8 @@ public long spaceConsumed() { // In case striped blocks, total usage by this striped blocks should // be the total of data blocks and parity blocks because // `getNumBytes` is the total of actual data block size. - return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1) - * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes(); + return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1) + * BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 53a99b05eb..24943816bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -66,6 +66,12 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -102,6 +108,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -124,8 +131,10 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.namenode.FSDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; @@ -134,7 +143,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; -import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; @@ -156,12 +164,8 @@ import org.mockito.internal.util.reflection.Whitebox; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; /** Utilities for HDFS tests */ public class DFSTestUtil { @@ -1846,4 +1850,77 @@ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks); return reports; } + + public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks, int numStripesPerBlk) throws Exception { + DistributedFileSystem dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.getClient().createErasureCodingZone(dir.toString()); + + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); // create an empty file + + FSNamesystem ns = cluster.getNamesystem(); + FSDirectory fsdir = ns.getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + + ExtendedBlock previous = null; + for (int i = 0; i < numBlocks; i++) { + Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns, + file.toString(), fileNode, dfs.getClient().getClientName(), + previous, numStripesPerBlk); + previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); + } + + dfs.getClient().namenode.complete(file.toString(), + dfs.getClient().getClientName(), previous, fileNode.getId()); + } finally { + IOUtils.cleanup(null, out); + } + } + + static Block createBlock(List dataNodes, DistributedFileSystem fs, + FSNamesystem ns, String file, INodeFile fileNode, String clientName, + ExtendedBlock previous, int numStripes) throws Exception { + fs.getClient().namenode.addBlock(file, clientName, previous, null, + fileNode.getId(), null); + + final BlockInfo lastBlock = fileNode.getLastBlock(); + final int groupSize = fileNode.getBlockReplication(); + // 1. RECEIVING_BLOCK IBR + int i = 0; + for (DataNode dn : dataNodes) { + if (i < groupSize) { + final Block block = new Block(lastBlock.getBlockId() + i++, 0, + lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + } + + // 2. RECEIVED_BLOCK IBR + i = 0; + for (DataNode dn : dataNodes) { + if (i < groupSize) { + final Block block = new Block(lastBlock.getBlockId() + i++, + numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + } + + lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS); + return lastBlock; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java new file mode 100644 index 0000000000..0032bdd47d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class TestReadStripedFile { + + public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class); + + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS; + private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int NUM_STRIPE_PER_BLOCK = 2; + private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE; + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + SimulatedFSDataset.setFactory(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE) + .build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private void testPlanReadPortions(int startInBlk, int length, + int bufferOffset, int[] readLengths, int[] offsetsInBlock, + int[][] bufferOffsets, int[][] bufferLengths) { + ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE, + CELLSIZE, startInBlk, length, bufferOffset); + assertEquals(GROUP_SIZE, results.length); + + for (int i = 0; i < GROUP_SIZE; i++) { + assertEquals(readLengths[i], results[i].getReadLength()); + assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock()); + final int[] bOffsets = results[i].getOffsets(); + assertArrayEquals(bufferOffsets[i], bOffsets); + final int[] bLengths = results[i].getLengths(); + assertArrayEquals(bufferLengths[i], bLengths); + } + } + + /** + * Test {@link DFSStripedInputStream#planReadPortions} + */ + @Test + public void testPlanReadPortions() { + /** + * start block offset is 0, read cellSize - 10 + */ + testPlanReadPortions(0, CELLSIZE - 10, 0, + new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0}, + new int[][]{new int[]{0}, new int[]{}, new int[]{}}, + new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}}); + + /** + * start block offset is 0, read 3 * cellSize + */ + testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0, + new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0}, + new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}}, + new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}}); + + /** + * start block offset is 0, read cellSize + 10 + */ + testPlanReadPortions(0, CELLSIZE + 10, 0, + new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0}, + new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}}, + new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}}); + + /** + * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100 + */ + testPlanReadPortions(0, 5 * CELLSIZE + 10, 100, + new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0}, + new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE}, + new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4}, + new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}}, + new int[][]{new int[]{CELLSIZE, CELLSIZE}, + new int[]{CELLSIZE, CELLSIZE}, + new int[]{CELLSIZE, 10}}); + + /** + * start block offset is 2, read 3 * cellSize + */ + testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100, + new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, + new int[]{2, 0, 0}, + new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2}, + new int[]{100 + CELLSIZE - 2}, + new int[]{100 + CELLSIZE * 2 - 2}}, + new int[][]{new int[]{CELLSIZE - 2, 2}, + new int[]{CELLSIZE}, + new int[]{CELLSIZE}}); + + /** + * start block offset is 2, read 3 * cellSize + 10 + */ + testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0, + new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE}, + new int[]{2, 0, 0}, + new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2}, + new int[]{CELLSIZE - 2}, + new int[]{CELLSIZE * 2 - 2}}, + new int[][]{new int[]{CELLSIZE - 2, 12}, + new int[]{CELLSIZE}, + new int[]{CELLSIZE}}); + + /** + * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10 + */ + testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0, + new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2}, + new int[]{CELLSIZE, CELLSIZE - 1, 0}, + new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1}, + new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}, + new int[]{1, 3 * CELLSIZE + 1}}, + new int[][]{new int[]{CELLSIZE, CELLSIZE}, + new int[]{1, CELLSIZE, 9}, + new int[]{CELLSIZE, CELLSIZE}}); + + /** + * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10 + */ + testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0, + new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1}, + new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1}, + new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1}, + new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1}, + new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}}, + new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, + new int[]{CELLSIZE, CELLSIZE, 9}, + new int[]{1, CELLSIZE, CELLSIZE}}); + } + + private LocatedStripedBlock createDummyLocatedBlock() { + final long blockGroupID = -1048576; + DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE]; + String[] storageIDs = new String[TOTAL_SIZE]; + StorageType[] storageTypes = new StorageType[TOTAL_SIZE]; + int[] indices = new int[TOTAL_SIZE]; + for (int i = 0; i < TOTAL_SIZE; i++) { + locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId()); + storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid(); + storageTypes[i] = StorageType.DISK; + indices[i] = (i + 2) % GROUP_SIZE; + } + return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID), + locs, storageIDs, storageTypes, indices, 0, false, null); + } + + @Test + public void testParseDummyStripedBlock() { + LocatedStripedBlock lsb = createDummyLocatedBlock(); + LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup( + lsb, GROUP_SIZE, CELLSIZE); + assertEquals(GROUP_SIZE, blocks.length); + for (int j = 0; j < GROUP_SIZE; j++) { + assertFalse(blocks[j].isStriped()); + assertEquals(j, + BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock())); + assertEquals(j * CELLSIZE, blocks[j].getStartOffset()); + } + } + + @Test + public void testParseStripedBlock() throws Exception { + final int numBlocks = 4; + DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, + NUM_STRIPE_PER_BLOCK); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCKSIZE * numBlocks); + + assertEquals(4, lbs.locatedBlockCount()); + List lbList = lbs.getLocatedBlocks(); + for (LocatedBlock lb : lbList) { + assertTrue(lb.isStriped()); + } + + for (int i = 0; i < numBlocks; i++) { + LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i)); + LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb, + GROUP_SIZE, CELLSIZE); + assertEquals(GROUP_SIZE, blks.length); + for (int j = 0; j < GROUP_SIZE; j++) { + assertFalse(blks[j].isStriped()); + assertEquals(j, + BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock())); + assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset()); + } + } + } + + /** + * Test {@link DFSStripedInputStream#getBlockAt(long)} + */ + @Test + public void testGetBlock() throws Exception { + final int numBlocks = 4; + DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, + NUM_STRIPE_PER_BLOCK); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCKSIZE * numBlocks); + final DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); + + List lbList = lbs.getLocatedBlocks(); + for (LocatedBlock aLbList : lbList) { + LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList; + LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb, + GROUP_SIZE, CELLSIZE); + for (int j = 0; j < GROUP_SIZE; j++) { + LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset()); + assertEquals(blks[j].getBlock(), refreshed.getBlock()); + assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); + assertArrayEquals(blks[j].getLocations(), refreshed.getLocations()); + } + } + } + + @Test + public void testPread() throws Exception { + final int numBlocks = 4; + DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, + NUM_STRIPE_PER_BLOCK); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCKSIZE); + + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); + for (int i = 0; i < GROUP_SIZE; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); + in.setCellSize(CELLSIZE); + int readSize = BLOCKSIZE; + byte[] readBuffer = new byte[readSize]; + int ret = in.read(0, readBuffer, 0, readSize); + + assertEquals(readSize, ret); + // TODO: verify read results with patterned data from HDFS-8117 + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index d965ae7213..b2ff6c881d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -18,15 +18,11 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; @@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; -import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; -import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; -import java.util.UUID; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -84,83 +75,10 @@ public void tearDown() throws Exception { } } - public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, - int numBlocks) throws Exception { - DistributedFileSystem dfs = cluster.getFileSystem(); - dfs.mkdirs(dir); - dfs.getClient().getNamenode().createErasureCodingZone(dir.toString()); - - FSDataOutputStream out = null; - try { - out = dfs.create(file, (short) 1); // create an empty file - - FSNamesystem ns = cluster.getNamesystem(); - FSDirectory fsdir = ns.getFSDirectory(); - INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); - - ExtendedBlock previous = null; - for (int i = 0; i < numBlocks; i++) { - Block newBlock = createBlock(cluster.getDataNodes(), ns, - file.toString(), fileNode, dfs.getClient().getClientName(), - previous); - previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); - } - - ns.completeFile(file.toString(), dfs.getClient().getClientName(), - previous, fileNode.getId()); - } finally { - IOUtils.cleanup(null, out); - } - } - - static Block createBlock(List dataNodes, FSNamesystem ns, - String file, INodeFile fileNode, String clientName, - ExtendedBlock previous) throws Exception { - ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null, - null); - - final BlockInfo lastBlock = fileNode.getLastBlock(); - final int groupSize = fileNode.getBlockReplication(); - // 1. RECEIVING_BLOCK IBR - int i = 0; - for (DataNode dn : dataNodes) { - if (i < groupSize) { - final Block block = new Block(lastBlock.getBlockId() + i++, 0, - lastBlock.getGenerationStamp()); - DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); - StorageReceivedDeletedBlocks[] reports = DFSTestUtil - .makeReportForReceivedBlock(block, - ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); - for (StorageReceivedDeletedBlocks report : reports) { - ns.processIncrementalBlockReport(dn.getDatanodeId(), report); - } - } - } - - // 2. RECEIVED_BLOCK IBR - i = 0; - for (DataNode dn : dataNodes) { - if (i < groupSize) { - final Block block = new Block(lastBlock.getBlockId() + i++, - BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp()); - DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); - StorageReceivedDeletedBlocks[] reports = DFSTestUtil - .makeReportForReceivedBlock(block, - ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); - for (StorageReceivedDeletedBlocks report : reports) { - ns.processIncrementalBlockReport(dn.getDatanodeId(), report); - } - } - } - - lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS); - return lastBlock; - } - @Test public void testMissingStripedBlock() throws Exception { final int numBlocks = 4; - createECFile(cluster, filePath, dirPath, numBlocks); + DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1); // make sure the file is complete in NN final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() @@ -172,7 +90,7 @@ public void testMissingStripedBlock() throws Exception { for (BlockInfo blk : blocks) { assertTrue(blk.isStriped()); assertTrue(blk.isComplete()); - assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes()); + assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes()); final BlockInfoStriped sb = (BlockInfoStriped) blk; assertEquals(GROUP_SIZE, sb.numNodes()); }