Merge branch 'trunk' into HADOOP-12756
This commit is contained in:
commit
846c5ceb3a
@ -85,7 +85,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
|
||||
private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
|
||||
return direct ? directBuffers : buffers;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized ByteBuffer getBuffer(boolean direct, int length) {
|
||||
TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
|
||||
|
@ -29,6 +29,9 @@ public class ECChunk {
|
||||
|
||||
private ByteBuffer chunkBuffer;
|
||||
|
||||
// TODO: should be in a more general flags
|
||||
private boolean allZero = false;
|
||||
|
||||
/**
|
||||
* Wrapping a ByteBuffer
|
||||
* @param buffer buffer to be wrapped by the chunk
|
||||
@ -37,6 +40,13 @@ public class ECChunk {
|
||||
this.chunkBuffer = buffer;
|
||||
}
|
||||
|
||||
public ECChunk(ByteBuffer buffer, int offset, int len) {
|
||||
ByteBuffer tmp = buffer.duplicate();
|
||||
tmp.position(offset);
|
||||
tmp.limit(offset + len);
|
||||
this.chunkBuffer = tmp.slice();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapping a bytes array
|
||||
* @param buffer buffer to be wrapped by the chunk
|
||||
@ -45,6 +55,18 @@ public class ECChunk {
|
||||
this.chunkBuffer = ByteBuffer.wrap(buffer);
|
||||
}
|
||||
|
||||
public ECChunk(byte[] buffer, int offset, int len) {
|
||||
this.chunkBuffer = ByteBuffer.wrap(buffer, offset, len);
|
||||
}
|
||||
|
||||
public boolean isAllZero() {
|
||||
return allZero;
|
||||
}
|
||||
|
||||
public void setAllZero(boolean allZero) {
|
||||
this.allZero = allZero;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert to ByteBuffer
|
||||
* @return ByteBuffer
|
||||
|
@ -115,6 +115,9 @@ final class CoderUtil {
|
||||
buffers[i] = null;
|
||||
} else {
|
||||
buffers[i] = chunk.getBuffer();
|
||||
if (chunk.isAllZero()) {
|
||||
CoderUtil.resetBuffer(buffers[i], buffers[i].remaining());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -528,7 +528,7 @@ extends AbstractDelegationTokenIdentifier>
|
||||
DataInputStream in = new DataInputStream(buf);
|
||||
TokenIdent id = createIdentifier();
|
||||
id.readFields(in);
|
||||
LOG.info("Token cancelation requested for identifier: "+id);
|
||||
LOG.info("Token cancellation requested for identifier: " + id);
|
||||
|
||||
if (id.getUser() == null) {
|
||||
throw new InvalidToken("Token with no owner");
|
||||
|
@ -240,7 +240,7 @@ public class DFSInputStream extends FSInputStream
|
||||
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
|
||||
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
||||
while (oldIter.hasNext() && newIter.hasNext()) {
|
||||
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
||||
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
||||
throw new IOException("Blocklist for " + src + " has changed!");
|
||||
}
|
||||
}
|
||||
@ -677,8 +677,8 @@ public class DFSInputStream extends FSInputStream
|
||||
if (oneByteBuf == null) {
|
||||
oneByteBuf = new byte[1];
|
||||
}
|
||||
int ret = read( oneByteBuf, 0, 1 );
|
||||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
||||
int ret = read(oneByteBuf, 0, 1);
|
||||
return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
|
||||
}
|
||||
|
||||
/* This is a used by regular read() and handles ChecksumExceptions.
|
||||
@ -702,7 +702,7 @@ public class DFSInputStream extends FSInputStream
|
||||
// retry as many times as seekToNewSource allows.
|
||||
try {
|
||||
return reader.readFromBlock(blockReader, len);
|
||||
} catch ( ChecksumException ce ) {
|
||||
} catch (ChecksumException ce) {
|
||||
DFSClient.LOG.warn("Found Checksum error for "
|
||||
+ getCurrentBlock() + " from " + currentNode
|
||||
+ " at " + ce.getPos());
|
||||
@ -710,7 +710,7 @@ public class DFSInputStream extends FSInputStream
|
||||
retryCurrentNode = false;
|
||||
// we want to remember which block replicas we have tried
|
||||
corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
|
||||
} catch ( IOException e ) {
|
||||
} catch (IOException e) {
|
||||
if (!retryCurrentNode) {
|
||||
DFSClient.LOG.warn("Exception while reading from "
|
||||
+ getCurrentBlock() + " of " + src + " from "
|
||||
@ -779,7 +779,9 @@ public class DFSInputStream extends FSInputStream
|
||||
DFSClient.LOG.warn("DFS Read", e);
|
||||
}
|
||||
blockEnd = -1;
|
||||
if (currentNode != null) { addToDeadNodes(currentNode); }
|
||||
if (currentNode != null) {
|
||||
addToDeadNodes(currentNode);
|
||||
}
|
||||
if (--retries == 0) {
|
||||
throw e;
|
||||
}
|
||||
@ -1397,10 +1399,10 @@ public class DFSInputStream extends FSInputStream
|
||||
|
||||
@Override
|
||||
public long skip(long n) throws IOException {
|
||||
if ( n > 0 ) {
|
||||
if (n > 0) {
|
||||
long curPos = getPos();
|
||||
long fileLen = getFileLength();
|
||||
if( n+curPos > fileLen ) {
|
||||
if (n+curPos > fileLen) {
|
||||
n = fileLen - curPos;
|
||||
}
|
||||
seek(curPos+n);
|
||||
@ -1550,7 +1552,7 @@ public class DFSInputStream extends FSInputStream
|
||||
* Get statistics about the reads which this DFSInputStream has done.
|
||||
*/
|
||||
public ReadStatistics getReadStatistics() {
|
||||
return new ReadStatistics(readStatistics);
|
||||
return readStatistics;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,24 +17,21 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo;
|
||||
import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||
|
||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
@ -44,7 +41,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -53,111 +49,32 @@ import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* DFSStripedInputStream reads from striped block groups
|
||||
* DFSStripedInputStream reads from striped block groups.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DFSStripedInputStream extends DFSInputStream {
|
||||
|
||||
private static class ReaderRetryPolicy {
|
||||
private int fetchEncryptionKeyTimes = 1;
|
||||
private int fetchTokenTimes = 1;
|
||||
|
||||
void refetchEncryptionKey() {
|
||||
fetchEncryptionKeyTimes--;
|
||||
}
|
||||
|
||||
void refetchToken() {
|
||||
fetchTokenTimes--;
|
||||
}
|
||||
|
||||
boolean shouldRefetchEncryptionKey() {
|
||||
return fetchEncryptionKeyTimes > 0;
|
||||
}
|
||||
|
||||
boolean shouldRefetchToken() {
|
||||
return fetchTokenTimes > 0;
|
||||
}
|
||||
}
|
||||
|
||||
/** Used to indicate the buffered data's range in the block group */
|
||||
private static class StripeRange {
|
||||
/** start offset in the block group (inclusive) */
|
||||
final long offsetInBlock;
|
||||
/** length of the stripe range */
|
||||
final long length;
|
||||
|
||||
StripeRange(long offsetInBlock, long length) {
|
||||
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
||||
this.offsetInBlock = offsetInBlock;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
boolean include(long pos) {
|
||||
return pos >= offsetInBlock && pos < offsetInBlock + length;
|
||||
}
|
||||
}
|
||||
|
||||
private static class BlockReaderInfo {
|
||||
final BlockReader reader;
|
||||
final DatanodeInfo datanode;
|
||||
/**
|
||||
* when initializing block readers, their starting offsets are set to the same
|
||||
* number: the smallest internal block offsets among all the readers. This is
|
||||
* because it is possible that for some internal blocks we have to read
|
||||
* "backwards" for decoding purpose. We thus use this offset array to track
|
||||
* offsets for all the block readers so that we can skip data if necessary.
|
||||
*/
|
||||
long blockReaderOffset;
|
||||
/**
|
||||
* We use this field to indicate whether we should use this reader. In case
|
||||
* we hit any issue with this reader, we set this field to true and avoid
|
||||
* using it for the next stripe.
|
||||
*/
|
||||
boolean shouldSkip = false;
|
||||
|
||||
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
|
||||
this.reader = reader;
|
||||
this.datanode = dn;
|
||||
this.blockReaderOffset = offset;
|
||||
}
|
||||
|
||||
void setOffset(long offset) {
|
||||
this.blockReaderOffset = offset;
|
||||
}
|
||||
|
||||
void skip() {
|
||||
this.shouldSkip = true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
||||
|
||||
private final BlockReaderInfo[] blockReaders;
|
||||
private final int cellSize;
|
||||
private final short dataBlkNum;
|
||||
private final short parityBlkNum;
|
||||
private final int groupSize;
|
||||
/** the buffer for a complete stripe */
|
||||
/** the buffer for a complete stripe. */
|
||||
private ByteBuffer curStripeBuf;
|
||||
private ByteBuffer parityBuf;
|
||||
private final ErasureCodingPolicy ecPolicy;
|
||||
private final RawErasureDecoder decoder;
|
||||
|
||||
/**
|
||||
* indicate the start/end offset of the current buffered stripe in the
|
||||
* block group
|
||||
* Indicate the start/end offset of the current buffered stripe in the
|
||||
* block group.
|
||||
*/
|
||||
private StripeRange curStripeRange;
|
||||
private final CompletionService<Void> readingService;
|
||||
|
||||
/**
|
||||
* When warning the user of a lost block in striping mode, we remember the
|
||||
@ -167,8 +84,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
*
|
||||
* To minimize the overhead, we only store the datanodeUuid in this set
|
||||
*/
|
||||
private final Set<String> warnedNodes = Collections.newSetFromMap(
|
||||
new ConcurrentHashMap<String, Boolean>());
|
||||
private final Set<String> warnedNodes =
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<>());
|
||||
|
||||
DFSStripedInputStream(DFSClient dfsClient, String src,
|
||||
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
|
||||
@ -183,8 +100,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
groupSize = dataBlkNum + parityBlkNum;
|
||||
blockReaders = new BlockReaderInfo[groupSize];
|
||||
curStripeRange = new StripeRange(0, 0);
|
||||
readingService =
|
||||
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
||||
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
||||
dataBlkNum, parityBlkNum);
|
||||
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
|
||||
@ -198,7 +113,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
return decoder.preferDirectBuffer();
|
||||
}
|
||||
|
||||
private void resetCurStripeBuffer() {
|
||||
void resetCurStripeBuffer() {
|
||||
if (curStripeBuf == null) {
|
||||
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||
cellSize * dataBlkNum);
|
||||
@ -207,7 +122,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
curStripeRange = new StripeRange(0, 0);
|
||||
}
|
||||
|
||||
private ByteBuffer getParityBuffer() {
|
||||
protected ByteBuffer getParityBuffer() {
|
||||
if (parityBuf == null) {
|
||||
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||
cellSize * parityBlkNum);
|
||||
@ -216,6 +131,29 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
return parityBuf;
|
||||
}
|
||||
|
||||
protected ByteBuffer getCurStripeBuf() {
|
||||
return curStripeBuf;
|
||||
}
|
||||
|
||||
protected String getSrc() {
|
||||
return src;
|
||||
}
|
||||
|
||||
protected DFSClient getDFSClient() {
|
||||
return dfsClient;
|
||||
}
|
||||
|
||||
protected LocatedBlocks getLocatedBlocks() {
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
protected ByteBufferPool getBufferPool() {
|
||||
return BUFFER_POOL;
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor getStripedReadsThreadPool(){
|
||||
return dfsClient.getStripedReadsThreadPool();
|
||||
}
|
||||
/**
|
||||
* When seeking into a new block group, create blockReader for each internal
|
||||
* block in the group.
|
||||
@ -268,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
blockEnd = -1;
|
||||
}
|
||||
|
||||
private void closeReader(BlockReaderInfo readerInfo) {
|
||||
protected void closeReader(BlockReaderInfo readerInfo) {
|
||||
if (readerInfo != null) {
|
||||
if (readerInfo.reader != null) {
|
||||
try {
|
||||
@ -288,6 +226,59 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
return pos - currentLocatedBlock.getStartOffset();
|
||||
}
|
||||
|
||||
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
|
||||
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
||||
int chunkIndex) throws IOException {
|
||||
BlockReader reader = null;
|
||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||
DFSInputStream.DNAddrPair dnInfo =
|
||||
new DFSInputStream.DNAddrPair(null, null, null);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
// the cached block location might have been re-fetched, so always
|
||||
// get it from cache.
|
||||
block = refreshLocatedBlock(block);
|
||||
targetBlocks[chunkIndex] = block;
|
||||
|
||||
// internal block has one location, just rule out the deadNodes
|
||||
dnInfo = getBestNodeDNAddrPair(block, null);
|
||||
if (dnInfo == null) {
|
||||
break;
|
||||
}
|
||||
reader = getBlockReader(block, offsetInBlock,
|
||||
block.getBlockSize() - offsetInBlock,
|
||||
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
||||
} catch (IOException e) {
|
||||
if (e instanceof InvalidEncryptionKeyException &&
|
||||
retry.shouldRefetchEncryptionKey()) {
|
||||
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
||||
+ "encryption key was invalid when connecting to " + dnInfo.addr
|
||||
+ " : " + e);
|
||||
dfsClient.clearDataEncryptionKey();
|
||||
retry.refetchEncryptionKey();
|
||||
} else if (retry.shouldRefetchToken() &&
|
||||
tokenRefetchNeeded(e, dnInfo.addr)) {
|
||||
fetchBlockAt(block.getStartOffset());
|
||||
retry.refetchToken();
|
||||
} else {
|
||||
//TODO: handles connection issues
|
||||
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
||||
"block" + block.getBlock(), e);
|
||||
// re-fetch the block in case the block has been moved
|
||||
fetchBlockAt(block.getStartOffset());
|
||||
addToDeadNodes(dnInfo.info);
|
||||
}
|
||||
}
|
||||
if (reader != null) {
|
||||
readerInfos[chunkIndex] =
|
||||
new BlockReaderInfo(reader, dnInfo.info, offsetInBlock);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a new stripe covering the current position, and store the data in the
|
||||
* {@link #curStripeBuf}.
|
||||
@ -303,20 +294,20 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
|
||||
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
|
||||
- (stripeIndex * stripeLen), stripeLen);
|
||||
StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
|
||||
stripeLimit - stripeBufOffset);
|
||||
StripeRange stripeRange =
|
||||
new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset);
|
||||
|
||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
|
||||
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
|
||||
cellSize, blockGroup, offsetInBlockGroup,
|
||||
offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
|
||||
offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf);
|
||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
||||
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||
// read the whole stripe
|
||||
for (AlignedStripe stripe : stripes) {
|
||||
// Parse group to get chosen DN location
|
||||
StripeReader sreader = new StatefulStripeReader(readingService, stripe,
|
||||
blks, blockReaders, corruptedBlocks);
|
||||
StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks,
|
||||
blockReaders, corruptedBlocks, decoder, this);
|
||||
sreader.readStripe();
|
||||
}
|
||||
curStripeBuf.position(stripeBufOffset);
|
||||
@ -324,69 +315,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
curStripeRange = stripeRange;
|
||||
}
|
||||
|
||||
private Callable<Void> readCells(final BlockReader reader,
|
||||
final DatanodeInfo datanode, final long currentReaderOffset,
|
||||
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
||||
final ExtendedBlock currentBlock,
|
||||
final CorruptedBlocks corruptedBlocks) {
|
||||
return new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
// reader can be null if getBlockReaderWithRetry failed or
|
||||
// the reader hit exception before
|
||||
if (reader == null) {
|
||||
throw new IOException("The BlockReader is null. " +
|
||||
"The BlockReader creation failed or the reader hit exception.");
|
||||
}
|
||||
Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
|
||||
if (currentReaderOffset < targetReaderOffset) {
|
||||
long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
|
||||
Preconditions.checkState(
|
||||
skipped == targetReaderOffset - currentReaderOffset);
|
||||
}
|
||||
int result = 0;
|
||||
for (ByteBufferStrategy strategy : strategies) {
|
||||
result += readToBuffer(reader, datanode, strategy, currentBlock,
|
||||
corruptedBlocks);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private int readToBuffer(BlockReader blockReader,
|
||||
DatanodeInfo currentNode, ByteBufferStrategy strategy,
|
||||
ExtendedBlock currentBlock,
|
||||
CorruptedBlocks corruptedBlocks)
|
||||
throws IOException {
|
||||
final int targetLength = strategy.getTargetLength();
|
||||
int length = 0;
|
||||
try {
|
||||
while (length < targetLength) {
|
||||
int ret = strategy.readFromBlock(blockReader);
|
||||
if (ret < 0) {
|
||||
throw new IOException("Unexpected EOS from the reader");
|
||||
}
|
||||
length += ret;
|
||||
}
|
||||
return length;
|
||||
} catch (ChecksumException ce) {
|
||||
DFSClient.LOG.warn("Found Checksum error for "
|
||||
+ currentBlock + " from " + currentNode
|
||||
+ " at " + ce.getPos());
|
||||
// we want to remember which block replicas we have tried
|
||||
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
||||
throw ce;
|
||||
} catch (IOException e) {
|
||||
DFSClient.LOG.warn("Exception while reading from "
|
||||
+ currentBlock + " of " + src + " from "
|
||||
+ currentNode, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Seek to a new arbitrary location
|
||||
* Seek to a new arbitrary location.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void seek(long targetPos) throws IOException {
|
||||
@ -469,7 +399,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy the data from {@link #curStripeBuf} into the given buffer
|
||||
* Copy the data from {@link #curStripeBuf} into the given buffer.
|
||||
* @param strategy the ReaderStrategy containing the given buffer
|
||||
* @param length target length
|
||||
* @return number of bytes copied
|
||||
@ -530,17 +460,19 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
|
||||
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
|
||||
ecPolicy, cellSize, blockGroup, start, end, buf);
|
||||
CompletionService<Void> readService = new ExecutorCompletionService<>(
|
||||
dfsClient.getStripedReadsThreadPool());
|
||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
||||
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
||||
try {
|
||||
for (AlignedStripe stripe : stripes) {
|
||||
// Parse group to get chosen DN location
|
||||
StripeReader preader = new PositionStripeReader(readService, stripe,
|
||||
blks, preaderInfos, corruptedBlocks);
|
||||
preader.readStripe();
|
||||
StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
|
||||
preaderInfos, corruptedBlocks, decoder, this);
|
||||
try {
|
||||
preader.readStripe();
|
||||
} finally {
|
||||
preader.close();
|
||||
}
|
||||
}
|
||||
buf.position(buf.position() + (int)(end - start + 1));
|
||||
} finally {
|
||||
@ -570,376 +502,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The reader for reading a complete {@link AlignedStripe}. Note that an
|
||||
* {@link AlignedStripe} may cross multiple stripes with cellSize width.
|
||||
*/
|
||||
private abstract class StripeReader {
|
||||
final Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||
final AlignedStripe alignedStripe;
|
||||
final CompletionService<Void> service;
|
||||
final LocatedBlock[] targetBlocks;
|
||||
final CorruptedBlocks corruptedBlocks;
|
||||
final BlockReaderInfo[] readerInfos;
|
||||
|
||||
StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
|
||||
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
||||
CorruptedBlocks corruptedBlocks) {
|
||||
this.service = service;
|
||||
this.alignedStripe = alignedStripe;
|
||||
this.targetBlocks = targetBlocks;
|
||||
this.readerInfos = readerInfos;
|
||||
this.corruptedBlocks = corruptedBlocks;
|
||||
}
|
||||
|
||||
/** prepare all the data chunks */
|
||||
abstract void prepareDecodeInputs();
|
||||
|
||||
/** prepare the parity chunk and block reader if necessary */
|
||||
abstract boolean prepareParityChunk(int index);
|
||||
|
||||
abstract void decode();
|
||||
|
||||
void updateState4SuccessRead(StripingChunkReadResult result) {
|
||||
Preconditions.checkArgument(
|
||||
result.state == StripingChunkReadResult.SUCCESSFUL);
|
||||
readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
|
||||
+ alignedStripe.getSpanInBlock());
|
||||
}
|
||||
|
||||
private void checkMissingBlocks() throws IOException {
|
||||
if (alignedStripe.missingChunksNum > parityBlkNum) {
|
||||
clearFutures(futures.keySet());
|
||||
throw new IOException(alignedStripe.missingChunksNum
|
||||
+ " missing blocks, the stripe is: " + alignedStripe
|
||||
+ "; locatedBlocks is: " + locatedBlocks);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We need decoding. Thus go through all the data chunks and make sure we
|
||||
* submit read requests for all of them.
|
||||
*/
|
||||
private void readDataForDecoding() throws IOException {
|
||||
prepareDecodeInputs();
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
Preconditions.checkNotNull(alignedStripe.chunks[i]);
|
||||
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
|
||||
if (!readChunk(targetBlocks[i], i)) {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
checkMissingBlocks();
|
||||
}
|
||||
|
||||
void readParityChunks(int num) throws IOException {
|
||||
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
|
||||
i++) {
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
|
||||
j++;
|
||||
} else {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
checkMissingBlocks();
|
||||
}
|
||||
|
||||
boolean createBlockReader(LocatedBlock block, int chunkIndex)
|
||||
throws IOException {
|
||||
BlockReader reader = null;
|
||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||
DNAddrPair dnInfo = new DNAddrPair(null, null, null);
|
||||
|
||||
while(true) {
|
||||
try {
|
||||
// the cached block location might have been re-fetched, so always
|
||||
// get it from cache.
|
||||
block = refreshLocatedBlock(block);
|
||||
targetBlocks[chunkIndex] = block;
|
||||
|
||||
// internal block has one location, just rule out the deadNodes
|
||||
dnInfo = getBestNodeDNAddrPair(block, null);
|
||||
if (dnInfo == null) {
|
||||
break;
|
||||
}
|
||||
reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
|
||||
block.getBlockSize() - alignedStripe.getOffsetInBlock(),
|
||||
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
||||
} catch (IOException e) {
|
||||
if (e instanceof InvalidEncryptionKeyException &&
|
||||
retry.shouldRefetchEncryptionKey()) {
|
||||
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
||||
+ "encryption key was invalid when connecting to " + dnInfo.addr
|
||||
+ " : " + e);
|
||||
dfsClient.clearDataEncryptionKey();
|
||||
retry.refetchEncryptionKey();
|
||||
} else if (retry.shouldRefetchToken() &&
|
||||
tokenRefetchNeeded(e, dnInfo.addr)) {
|
||||
fetchBlockAt(block.getStartOffset());
|
||||
retry.refetchToken();
|
||||
} else {
|
||||
//TODO: handles connection issues
|
||||
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
||||
"block" + block.getBlock(), e);
|
||||
// re-fetch the block in case the block has been moved
|
||||
fetchBlockAt(block.getStartOffset());
|
||||
addToDeadNodes(dnInfo.info);
|
||||
}
|
||||
}
|
||||
if (reader != null) {
|
||||
readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
|
||||
alignedStripe.getOffsetInBlock());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
||||
if (chunk.useByteBuffer()) {
|
||||
ByteBufferStrategy strategy = new ByteBufferStrategy(
|
||||
chunk.getByteBuffer(), readStatistics, dfsClient);
|
||||
return new ByteBufferStrategy[]{strategy};
|
||||
} else {
|
||||
ByteBufferStrategy[] strategies =
|
||||
new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
|
||||
for (int i = 0; i < strategies.length; i++) {
|
||||
ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
|
||||
strategies[i] =
|
||||
new ByteBufferStrategy(buffer, readStatistics, dfsClient);
|
||||
}
|
||||
return strategies;
|
||||
}
|
||||
}
|
||||
|
||||
boolean readChunk(final LocatedBlock block, int chunkIndex)
|
||||
throws IOException {
|
||||
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
|
||||
if (block == null) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
if (readerInfos[chunkIndex] == null) {
|
||||
if (!createBlockReader(block, chunkIndex)) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
} else if (readerInfos[chunkIndex].shouldSkip) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
|
||||
chunk.state = StripingChunk.PENDING;
|
||||
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
||||
readerInfos[chunkIndex].datanode,
|
||||
readerInfos[chunkIndex].blockReaderOffset,
|
||||
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
||||
block.getBlock(), corruptedBlocks);
|
||||
|
||||
Future<Void> request = service.submit(readCallable);
|
||||
futures.put(request, chunkIndex);
|
||||
return true;
|
||||
}
|
||||
|
||||
/** read the whole stripe. do decoding if necessary */
|
||||
void readStripe() throws IOException {
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
||||
if (!readChunk(targetBlocks[i], i)) {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// There are missing block locations at this stage. Thus we need to read
|
||||
// the full stripe and one more parity block.
|
||||
if (alignedStripe.missingChunksNum > 0) {
|
||||
checkMissingBlocks();
|
||||
readDataForDecoding();
|
||||
// read parity chunks
|
||||
readParityChunks(alignedStripe.missingChunksNum);
|
||||
}
|
||||
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
|
||||
|
||||
// Input buffers for potential decode operation, which remains null until
|
||||
// first read failure
|
||||
while (!futures.isEmpty()) {
|
||||
try {
|
||||
StripingChunkReadResult r = StripedBlockUtil
|
||||
.getNextCompletedStripedRead(service, futures, 0);
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
||||
+ alignedStripe);
|
||||
}
|
||||
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
||||
Preconditions.checkNotNull(returnedChunk);
|
||||
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
||||
|
||||
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
||||
returnedChunk.state = StripingChunk.FETCHED;
|
||||
alignedStripe.fetchedChunksNum++;
|
||||
updateState4SuccessRead(r);
|
||||
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
||||
clearFutures(futures.keySet());
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
returnedChunk.state = StripingChunk.MISSING;
|
||||
// close the corresponding reader
|
||||
closeReader(readerInfos[r.index]);
|
||||
|
||||
final int missing = alignedStripe.missingChunksNum;
|
||||
alignedStripe.missingChunksNum++;
|
||||
checkMissingBlocks();
|
||||
|
||||
readDataForDecoding();
|
||||
readParityChunks(alignedStripe.missingChunksNum - missing);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
String err = "Read request interrupted";
|
||||
DFSClient.LOG.error(err);
|
||||
clearFutures(futures.keySet());
|
||||
// Don't decode if read interrupted
|
||||
throw new InterruptedIOException(err);
|
||||
}
|
||||
}
|
||||
|
||||
if (alignedStripe.missingChunksNum > 0) {
|
||||
decode();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class PositionStripeReader extends StripeReader {
|
||||
private ByteBuffer[] decodeInputs = null;
|
||||
|
||||
PositionStripeReader(CompletionService<Void> service,
|
||||
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
||||
super(service, alignedStripe, targetBlocks, readerInfos,
|
||||
corruptedBlocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepareDecodeInputs() {
|
||||
if (decodeInputs == null) {
|
||||
decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
|
||||
dataBlkNum, parityBlkNum);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean prepareParityChunk(int index) {
|
||||
Preconditions.checkState(index >= dataBlkNum &&
|
||||
alignedStripe.chunks[index] == null);
|
||||
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void decode() {
|
||||
StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
|
||||
StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
|
||||
dataBlkNum, parityBlkNum, decoder);
|
||||
}
|
||||
}
|
||||
|
||||
class StatefulStripeReader extends StripeReader {
|
||||
ByteBuffer[] decodeInputs;
|
||||
|
||||
StatefulStripeReader(CompletionService<Void> service,
|
||||
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
||||
super(service, alignedStripe, targetBlocks, readerInfos,
|
||||
corruptedBlocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepareDecodeInputs() {
|
||||
if (decodeInputs == null) {
|
||||
decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
|
||||
final ByteBuffer cur;
|
||||
synchronized (DFSStripedInputStream.this) {
|
||||
cur = curStripeBuf.duplicate();
|
||||
}
|
||||
StripedBlockUtil.VerticalRange range = alignedStripe.range;
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
cur.limit(cur.capacity());
|
||||
int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
|
||||
cur.position(pos);
|
||||
cur.limit((int) (pos + range.spanInBlock));
|
||||
decodeInputs[i] = cur.slice();
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean prepareParityChunk(int index) {
|
||||
Preconditions.checkState(index >= dataBlkNum
|
||||
&& alignedStripe.chunks[index] == null);
|
||||
if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
|
||||
alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
|
||||
// we have failed the block reader before
|
||||
return false;
|
||||
}
|
||||
final int parityIndex = index - dataBlkNum;
|
||||
ByteBuffer buf = getParityBuffer().duplicate();
|
||||
buf.position(cellSize * parityIndex);
|
||||
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
|
||||
decodeInputs[index] = buf.slice();
|
||||
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void decode() {
|
||||
final int span = (int) alignedStripe.getSpanInBlock();
|
||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
|
||||
for (int j = 0; j < span; j++) {
|
||||
decodeInputs[i].put((byte) 0);
|
||||
}
|
||||
decodeInputs[i].flip();
|
||||
} else if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
|
||||
decodeInputs[i].position(0);
|
||||
decodeInputs[i].limit(span);
|
||||
}
|
||||
}
|
||||
int[] decodeIndices = new int[parityBlkNum];
|
||||
int pos = 0;
|
||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
|
||||
if (i < dataBlkNum) {
|
||||
decodeIndices[pos++] = i;
|
||||
} else {
|
||||
decodeInputs[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
decodeIndices = Arrays.copyOf(decodeIndices, pos);
|
||||
|
||||
final int decodeChunkNum = decodeIndices.length;
|
||||
ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
|
||||
for (int i = 0; i < decodeChunkNum; i++) {
|
||||
outputs[i] = decodeInputs[decodeIndices[i]];
|
||||
outputs[i].position(0);
|
||||
outputs[i].limit((int) alignedStripe.range.spanInBlock);
|
||||
decodeInputs[decodeIndices[i]] = null;
|
||||
}
|
||||
|
||||
decoder.decode(decodeInputs, decodeIndices, outputs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* May need online read recovery, zero-copy read doesn't make
|
||||
* sense, so don't support it.
|
||||
@ -957,12 +519,4 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||
throw new UnsupportedOperationException(
|
||||
"Not support enhanced byte buffer access.");
|
||||
}
|
||||
|
||||
/** A variation to {@link DFSInputStream#cancelAll} */
|
||||
private void clearFutures(Collection<Future<Void>> futures) {
|
||||
for (Future<Void> future : futures) {
|
||||
future.cancel(false);
|
||||
}
|
||||
futures.clear();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.configuration.SystemConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
|
||||
* which may cross multiple stripes with cellSize width.
|
||||
*/
|
||||
class PositionStripeReader extends StripeReader {
|
||||
private ByteBuffer codingBuffer;
|
||||
|
||||
PositionStripeReader(AlignedStripe alignedStripe,
|
||||
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||
RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
|
||||
super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
|
||||
corruptedBlocks, decoder, dfsStripedInputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepareDecodeInputs() {
|
||||
if (codingBuffer == null) {
|
||||
this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
|
||||
initDecodeInputs(alignedStripe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean prepareParityChunk(int index) {
|
||||
Preconditions.checkState(index >= dataBlkNum &&
|
||||
alignedStripe.chunks[index] == null);
|
||||
|
||||
alignedStripe.chunks[index] =
|
||||
new StripingChunk(decodeInputs[index].getBuffer());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void decode() {
|
||||
finalizeDecodeInputs();
|
||||
decodeAndFillBuffer(true);
|
||||
}
|
||||
|
||||
void initDecodeInputs(AlignedStripe alignedStripe) {
|
||||
int bufLen = (int) alignedStripe.getSpanInBlock();
|
||||
int bufCount = dataBlkNum + parityBlkNum;
|
||||
codingBuffer = dfsStripedInputStream.getBufferPool().
|
||||
getBuffer(useDirectBuffer(), bufLen * bufCount);
|
||||
ByteBuffer buffer;
|
||||
for (int i = 0; i < decodeInputs.length; i++) {
|
||||
buffer = codingBuffer.duplicate();
|
||||
decodeInputs[i] = new ECChunk(buffer, i * bufLen, bufLen);
|
||||
}
|
||||
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
alignedStripe.chunks[i] =
|
||||
new StripingChunk(decodeInputs[i].getBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void close() {
|
||||
if (decodeInputs != null) {
|
||||
for (int i = 0; i < decodeInputs.length; i++) {
|
||||
decodeInputs[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (codingBuffer != null) {
|
||||
dfsStripedInputStream.getBufferPool().putBuffer(codingBuffer);
|
||||
codingBuffer = null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
|
||||
* which belongs to a single stripe.
|
||||
* Reading cross multiple strips is not supported in this reader.
|
||||
*/
|
||||
class StatefulStripeReader extends StripeReader {
|
||||
|
||||
StatefulStripeReader(AlignedStripe alignedStripe,
|
||||
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||
RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
|
||||
super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
|
||||
corruptedBlocks, decoder, dfsStripedInputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepareDecodeInputs() {
|
||||
final ByteBuffer cur;
|
||||
synchronized (dfsStripedInputStream) {
|
||||
cur = dfsStripedInputStream.getCurStripeBuf().duplicate();
|
||||
}
|
||||
|
||||
this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
|
||||
int bufLen = (int) alignedStripe.getSpanInBlock();
|
||||
int bufOff = (int) alignedStripe.getOffsetInBlock();
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
cur.limit(cur.capacity());
|
||||
int pos = bufOff % cellSize + cellSize * i;
|
||||
cur.position(pos);
|
||||
cur.limit(pos + bufLen);
|
||||
decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen);
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
alignedStripe.chunks[i] =
|
||||
new StripingChunk(decodeInputs[i].getBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean prepareParityChunk(int index) {
|
||||
Preconditions.checkState(index >= dataBlkNum
|
||||
&& alignedStripe.chunks[index] == null);
|
||||
if (readerInfos[index] != null && readerInfos[index].shouldSkip) {
|
||||
alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
|
||||
// we have failed the block reader before
|
||||
return false;
|
||||
}
|
||||
final int parityIndex = index - dataBlkNum;
|
||||
ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate();
|
||||
buf.position(cellSize * parityIndex);
|
||||
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
|
||||
decodeInputs[index] =
|
||||
new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock);
|
||||
alignedStripe.chunks[index] =
|
||||
new StripingChunk(decodeInputs[index].getBuffer());
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void decode() {
|
||||
finalizeDecodeInputs();
|
||||
decodeAndFillBuffer(false);
|
||||
}
|
||||
}
|
@ -0,0 +1,463 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}.
|
||||
* Note that an {@link StripedBlockUtil.AlignedStripe} may cross multiple
|
||||
* stripes with cellSize width.
|
||||
*/
|
||||
abstract class StripeReader {
|
||||
|
||||
static class ReaderRetryPolicy {
|
||||
private int fetchEncryptionKeyTimes = 1;
|
||||
private int fetchTokenTimes = 1;
|
||||
|
||||
void refetchEncryptionKey() {
|
||||
fetchEncryptionKeyTimes--;
|
||||
}
|
||||
|
||||
void refetchToken() {
|
||||
fetchTokenTimes--;
|
||||
}
|
||||
|
||||
boolean shouldRefetchEncryptionKey() {
|
||||
return fetchEncryptionKeyTimes > 0;
|
||||
}
|
||||
|
||||
boolean shouldRefetchToken() {
|
||||
return fetchTokenTimes > 0;
|
||||
}
|
||||
}
|
||||
|
||||
static class BlockReaderInfo {
|
||||
final BlockReader reader;
|
||||
final DatanodeInfo datanode;
|
||||
/**
|
||||
* when initializing block readers, their starting offsets are set to the
|
||||
* same number: the smallest internal block offsets among all the readers.
|
||||
* This is because it is possible that for some internal blocks we have to
|
||||
* read "backwards" for decoding purpose. We thus use this offset array to
|
||||
* track offsets for all the block readers so that we can skip data if
|
||||
* necessary.
|
||||
*/
|
||||
long blockReaderOffset;
|
||||
/**
|
||||
* We use this field to indicate whether we should use this reader. In case
|
||||
* we hit any issue with this reader, we set this field to true and avoid
|
||||
* using it for the next stripe.
|
||||
*/
|
||||
boolean shouldSkip = false;
|
||||
|
||||
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
|
||||
this.reader = reader;
|
||||
this.datanode = dn;
|
||||
this.blockReaderOffset = offset;
|
||||
}
|
||||
|
||||
void setOffset(long offset) {
|
||||
this.blockReaderOffset = offset;
|
||||
}
|
||||
|
||||
void skip() {
|
||||
this.shouldSkip = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected final Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||
protected final AlignedStripe alignedStripe;
|
||||
protected final CompletionService<Void> service;
|
||||
protected final LocatedBlock[] targetBlocks;
|
||||
protected final CorruptedBlocks corruptedBlocks;
|
||||
protected final BlockReaderInfo[] readerInfos;
|
||||
protected final ErasureCodingPolicy ecPolicy;
|
||||
protected final short dataBlkNum;
|
||||
protected final short parityBlkNum;
|
||||
protected final int cellSize;
|
||||
protected final RawErasureDecoder decoder;
|
||||
protected final DFSStripedInputStream dfsStripedInputStream;
|
||||
|
||||
protected ECChunk[] decodeInputs;
|
||||
|
||||
StripeReader(AlignedStripe alignedStripe,
|
||||
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||
RawErasureDecoder decoder,
|
||||
DFSStripedInputStream dfsStripedInputStream) {
|
||||
this.alignedStripe = alignedStripe;
|
||||
this.ecPolicy = ecPolicy;
|
||||
this.dataBlkNum = (short)ecPolicy.getNumDataUnits();
|
||||
this.parityBlkNum = (short)ecPolicy.getNumParityUnits();
|
||||
this.cellSize = ecPolicy.getCellSize();
|
||||
this.targetBlocks = targetBlocks;
|
||||
this.readerInfos = readerInfos;
|
||||
this.corruptedBlocks = corruptedBlocks;
|
||||
this.decoder = decoder;
|
||||
this.dfsStripedInputStream = dfsStripedInputStream;
|
||||
|
||||
service = new ExecutorCompletionService<>(
|
||||
dfsStripedInputStream.getStripedReadsThreadPool());
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare all the data chunks.
|
||||
*/
|
||||
abstract void prepareDecodeInputs();
|
||||
|
||||
/**
|
||||
* Prepare the parity chunk and block reader if necessary.
|
||||
*/
|
||||
abstract boolean prepareParityChunk(int index);
|
||||
|
||||
/*
|
||||
* Decode to get the missing data.
|
||||
*/
|
||||
abstract void decode();
|
||||
|
||||
/*
|
||||
* Default close do nothing.
|
||||
*/
|
||||
void close() {
|
||||
}
|
||||
|
||||
void updateState4SuccessRead(StripingChunkReadResult result) {
|
||||
Preconditions.checkArgument(
|
||||
result.state == StripingChunkReadResult.SUCCESSFUL);
|
||||
readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
|
||||
+ alignedStripe.getSpanInBlock());
|
||||
}
|
||||
|
||||
private void checkMissingBlocks() throws IOException {
|
||||
if (alignedStripe.missingChunksNum > parityBlkNum) {
|
||||
clearFutures();
|
||||
throw new IOException(alignedStripe.missingChunksNum
|
||||
+ " missing blocks, the stripe is: " + alignedStripe
|
||||
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We need decoding. Thus go through all the data chunks and make sure we
|
||||
* submit read requests for all of them.
|
||||
*/
|
||||
private void readDataForDecoding() throws IOException {
|
||||
prepareDecodeInputs();
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
Preconditions.checkNotNull(alignedStripe.chunks[i]);
|
||||
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
|
||||
if (!readChunk(targetBlocks[i], i)) {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
checkMissingBlocks();
|
||||
}
|
||||
|
||||
void readParityChunks(int num) throws IOException {
|
||||
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
|
||||
i++) {
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
|
||||
j++;
|
||||
} else {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
checkMissingBlocks();
|
||||
}
|
||||
|
||||
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
||||
if (chunk.useByteBuffer()) {
|
||||
ByteBufferStrategy strategy = new ByteBufferStrategy(
|
||||
chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(),
|
||||
dfsStripedInputStream.getDFSClient());
|
||||
return new ByteBufferStrategy[]{strategy};
|
||||
}
|
||||
|
||||
ByteBufferStrategy[] strategies =
|
||||
new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
|
||||
for (int i = 0; i < strategies.length; i++) {
|
||||
ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
|
||||
strategies[i] = new ByteBufferStrategy(buffer,
|
||||
dfsStripedInputStream.getReadStatistics(),
|
||||
dfsStripedInputStream.getDFSClient());
|
||||
}
|
||||
return strategies;
|
||||
}
|
||||
|
||||
private int readToBuffer(BlockReader blockReader,
|
||||
DatanodeInfo currentNode, ByteBufferStrategy strategy,
|
||||
ExtendedBlock currentBlock) throws IOException {
|
||||
final int targetLength = strategy.getTargetLength();
|
||||
int length = 0;
|
||||
try {
|
||||
while (length < targetLength) {
|
||||
int ret = strategy.readFromBlock(blockReader);
|
||||
if (ret < 0) {
|
||||
throw new IOException("Unexpected EOS from the reader");
|
||||
}
|
||||
length += ret;
|
||||
}
|
||||
return length;
|
||||
} catch (ChecksumException ce) {
|
||||
DFSClient.LOG.warn("Found Checksum error for "
|
||||
+ currentBlock + " from " + currentNode
|
||||
+ " at " + ce.getPos());
|
||||
// we want to remember which block replicas we have tried
|
||||
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
||||
throw ce;
|
||||
} catch (IOException e) {
|
||||
DFSClient.LOG.warn("Exception while reading from "
|
||||
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
|
||||
+ currentNode, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private Callable<Void> readCells(final BlockReader reader,
|
||||
final DatanodeInfo datanode, final long currentReaderOffset,
|
||||
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
||||
final ExtendedBlock currentBlock) {
|
||||
return () -> {
|
||||
// reader can be null if getBlockReaderWithRetry failed or
|
||||
// the reader hit exception before
|
||||
if (reader == null) {
|
||||
throw new IOException("The BlockReader is null. " +
|
||||
"The BlockReader creation failed or the reader hit exception.");
|
||||
}
|
||||
Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
|
||||
if (currentReaderOffset < targetReaderOffset) {
|
||||
long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
|
||||
Preconditions.checkState(
|
||||
skipped == targetReaderOffset - currentReaderOffset);
|
||||
}
|
||||
|
||||
for (ByteBufferStrategy strategy : strategies) {
|
||||
readToBuffer(reader, datanode, strategy, currentBlock);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
}
|
||||
|
||||
boolean readChunk(final LocatedBlock block, int chunkIndex)
|
||||
throws IOException {
|
||||
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
|
||||
if (block == null) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (readerInfos[chunkIndex] == null) {
|
||||
if (!dfsStripedInputStream.createBlockReader(block,
|
||||
alignedStripe.getOffsetInBlock(), targetBlocks,
|
||||
readerInfos, chunkIndex)) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
} else if (readerInfos[chunkIndex].shouldSkip) {
|
||||
chunk.state = StripingChunk.MISSING;
|
||||
return false;
|
||||
}
|
||||
|
||||
chunk.state = StripingChunk.PENDING;
|
||||
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
||||
readerInfos[chunkIndex].datanode,
|
||||
readerInfos[chunkIndex].blockReaderOffset,
|
||||
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
||||
block.getBlock());
|
||||
|
||||
Future<Void> request = service.submit(readCallable);
|
||||
futures.put(request, chunkIndex);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* read the whole stripe. do decoding if necessary
|
||||
*/
|
||||
void readStripe() throws IOException {
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
||||
if (!readChunk(targetBlocks[i], i)) {
|
||||
alignedStripe.missingChunksNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// There are missing block locations at this stage. Thus we need to read
|
||||
// the full stripe and one more parity block.
|
||||
if (alignedStripe.missingChunksNum > 0) {
|
||||
checkMissingBlocks();
|
||||
readDataForDecoding();
|
||||
// read parity chunks
|
||||
readParityChunks(alignedStripe.missingChunksNum);
|
||||
}
|
||||
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
|
||||
|
||||
// Input buffers for potential decode operation, which remains null until
|
||||
// first read failure
|
||||
while (!futures.isEmpty()) {
|
||||
try {
|
||||
StripingChunkReadResult r = StripedBlockUtil
|
||||
.getNextCompletedStripedRead(service, futures, 0);
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
||||
+ alignedStripe);
|
||||
}
|
||||
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
||||
Preconditions.checkNotNull(returnedChunk);
|
||||
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
||||
|
||||
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
||||
returnedChunk.state = StripingChunk.FETCHED;
|
||||
alignedStripe.fetchedChunksNum++;
|
||||
updateState4SuccessRead(r);
|
||||
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
||||
clearFutures();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
returnedChunk.state = StripingChunk.MISSING;
|
||||
// close the corresponding reader
|
||||
dfsStripedInputStream.closeReader(readerInfos[r.index]);
|
||||
|
||||
final int missing = alignedStripe.missingChunksNum;
|
||||
alignedStripe.missingChunksNum++;
|
||||
checkMissingBlocks();
|
||||
|
||||
readDataForDecoding();
|
||||
readParityChunks(alignedStripe.missingChunksNum - missing);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
String err = "Read request interrupted";
|
||||
DFSClient.LOG.error(err);
|
||||
clearFutures();
|
||||
// Don't decode if read interrupted
|
||||
throw new InterruptedIOException(err);
|
||||
}
|
||||
}
|
||||
|
||||
if (alignedStripe.missingChunksNum > 0) {
|
||||
decode();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Some fetched {@link StripingChunk} might be stored in original application
|
||||
* buffer instead of prepared decode input buffers. Some others are beyond
|
||||
* the range of the internal blocks and should correspond to all zero bytes.
|
||||
* When all pending requests have returned, this method should be called to
|
||||
* finalize decode input buffers.
|
||||
*/
|
||||
|
||||
void finalizeDecodeInputs() {
|
||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||
final StripingChunk chunk = alignedStripe.chunks[i];
|
||||
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
|
||||
if (chunk.useChunkBuffer()) {
|
||||
chunk.getChunkBuffer().copyTo(decodeInputs[i].getBuffer());
|
||||
} else {
|
||||
chunk.getByteBuffer().flip();
|
||||
}
|
||||
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
|
||||
decodeInputs[i].setAllZero(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode based on the given input buffers and erasure coding policy.
|
||||
*/
|
||||
void decodeAndFillBuffer(boolean fillBuffer) {
|
||||
// Step 1: prepare indices and output buffers for missing data units
|
||||
int[] decodeIndices = prepareErasedIndices();
|
||||
|
||||
final int decodeChunkNum = decodeIndices.length;
|
||||
ECChunk[] outputs = new ECChunk[decodeChunkNum];
|
||||
for (int i = 0; i < decodeChunkNum; i++) {
|
||||
outputs[i] = decodeInputs[decodeIndices[i]];
|
||||
decodeInputs[decodeIndices[i]] = null;
|
||||
}
|
||||
// Step 2: decode into prepared output buffers
|
||||
decoder.decode(decodeInputs, decodeIndices, outputs);
|
||||
|
||||
// Step 3: fill original application buffer with decoded data
|
||||
if (fillBuffer) {
|
||||
for (int i = 0; i < decodeIndices.length; i++) {
|
||||
int missingBlkIdx = decodeIndices[i];
|
||||
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
|
||||
if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
|
||||
chunk.getChunkBuffer().copyFrom(outputs[i].getBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare erased indices.
|
||||
*/
|
||||
int[] prepareErasedIndices() {
|
||||
int[] decodeIndices = new int[parityBlkNum];
|
||||
int pos = 0;
|
||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state == StripingChunk.MISSING){
|
||||
decodeIndices[pos++] = i;
|
||||
}
|
||||
}
|
||||
|
||||
int[] erasedIndices = Arrays.copyOf(decodeIndices, pos);
|
||||
return erasedIndices;
|
||||
}
|
||||
|
||||
void clearFutures() {
|
||||
for (Future<Void> future : futures.keySet()) {
|
||||
future.cancel(false);
|
||||
}
|
||||
futures.clear();
|
||||
}
|
||||
|
||||
boolean useDirectBuffer() {
|
||||
return decoder.preferDirectBuffer();
|
||||
}
|
||||
}
|
@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -32,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -76,18 +75,6 @@ public class StripedBlockUtil {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(StripedBlockUtil.class);
|
||||
|
||||
/**
|
||||
* Parses a striped block group into individual blocks.
|
||||
* @param bg The striped block group
|
||||
* @param ecPolicy The erasure coding policy
|
||||
* @return An array of the blocks in the group
|
||||
*/
|
||||
public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
|
||||
ErasureCodingPolicy ecPolicy) {
|
||||
return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
|
||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
||||
}
|
||||
|
||||
/**
|
||||
* This method parses a striped block group into individual blocks.
|
||||
*
|
||||
@ -112,7 +99,7 @@ public class StripedBlockUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method creates an internal block at the given index of a block group
|
||||
* This method creates an internal block at the given index of a block group.
|
||||
*
|
||||
* @param idxInReturnedLocs The index in the stored locations in the
|
||||
* {@link LocatedStripedBlock} object
|
||||
@ -169,7 +156,7 @@ public class StripedBlockUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the size of an internal block at the given index of a block group
|
||||
* Get the size of an internal block at the given index of a block group.
|
||||
*
|
||||
* @param dataSize Size of the block group only counting data blocks
|
||||
* @param cellSize The size of a striping cell
|
||||
@ -237,7 +224,7 @@ public class StripedBlockUtil {
|
||||
|
||||
/**
|
||||
* Given a byte's offset in an internal block, calculate the offset in
|
||||
* the block group
|
||||
* the block group.
|
||||
*/
|
||||
public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
|
||||
long offsetInBlk, int idxInBlockGroup) {
|
||||
@ -248,12 +235,12 @@ public class StripedBlockUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next completed striped read task
|
||||
* Get the next completed striped read task.
|
||||
*
|
||||
* @return {@link StripingChunkReadResult} indicating the status of the read task
|
||||
* succeeded, and the block index of the task. If the method times
|
||||
* out without getting any completed read tasks, -1 is returned as
|
||||
* block index.
|
||||
* @return {@link StripingChunkReadResult} indicating the status of the read
|
||||
* task succeeded, and the block index of the task. If the method
|
||||
* times out without getting any completed read tasks, -1 is
|
||||
* returned as block index.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static StripingChunkReadResult getNextCompletedStripedRead(
|
||||
@ -287,7 +274,7 @@ public class StripedBlockUtil {
|
||||
|
||||
/**
|
||||
* Get the total usage of the striped blocks, which is the total of data
|
||||
* blocks and parity blocks
|
||||
* blocks and parity blocks.
|
||||
*
|
||||
* @param numDataBlkBytes
|
||||
* Size of the block group only counting data blocks
|
||||
@ -307,91 +294,6 @@ public class StripedBlockUtil {
|
||||
return numDataBlkBytes + numParityBlkBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the decoding input buffers based on the chunk states in an
|
||||
* {@link AlignedStripe}. For each chunk that was not initially requested,
|
||||
* schedule a new fetch request with the decoding input buffer as transfer
|
||||
* destination.
|
||||
*/
|
||||
public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
|
||||
int dataBlkNum, int parityBlkNum) {
|
||||
ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
|
||||
for (int i = 0; i < decodeInputs.length; i++) {
|
||||
decodeInputs[i] = ByteBuffer.allocate(
|
||||
(int) alignedStripe.getSpanInBlock());
|
||||
}
|
||||
// read the full data aligned stripe
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
if (alignedStripe.chunks[i] == null) {
|
||||
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
||||
}
|
||||
}
|
||||
return decodeInputs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Some fetched {@link StripingChunk} might be stored in original application
|
||||
* buffer instead of prepared decode input buffers. Some others are beyond
|
||||
* the range of the internal blocks and should correspond to all zero bytes.
|
||||
* When all pending requests have returned, this method should be called to
|
||||
* finalize decode input buffers.
|
||||
*/
|
||||
public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
|
||||
AlignedStripe alignedStripe) {
|
||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||
final StripingChunk chunk = alignedStripe.chunks[i];
|
||||
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
|
||||
if (chunk.useChunkBuffer()) {
|
||||
chunk.getChunkBuffer().copyTo(decodeInputs[i]);
|
||||
} else {
|
||||
chunk.getByteBuffer().flip();
|
||||
}
|
||||
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
|
||||
//ZERO it. Will be better handled in other following issue.
|
||||
byte[] emptyBytes = new byte[decodeInputs[i].limit()];
|
||||
decodeInputs[i].put(emptyBytes);
|
||||
decodeInputs[i].flip();
|
||||
} else {
|
||||
decodeInputs[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode based on the given input buffers and erasure coding policy.
|
||||
*/
|
||||
public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
|
||||
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
|
||||
RawErasureDecoder decoder) {
|
||||
// Step 1: prepare indices and output buffers for missing data units
|
||||
int[] decodeIndices = new int[parityBlkNum];
|
||||
int pos = 0;
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
if (alignedStripe.chunks[i] != null &&
|
||||
alignedStripe.chunks[i].state == StripingChunk.MISSING){
|
||||
decodeIndices[pos++] = i;
|
||||
}
|
||||
}
|
||||
decodeIndices = Arrays.copyOf(decodeIndices, pos);
|
||||
ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
|
||||
for (int i = 0; i < decodeOutputs.length; i++) {
|
||||
decodeOutputs[i] = ByteBuffer.allocate(
|
||||
(int) alignedStripe.getSpanInBlock());
|
||||
}
|
||||
|
||||
// Step 2: decode into prepared output buffers
|
||||
decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
|
||||
|
||||
// Step 3: fill original application buffer with decoded data
|
||||
for (int i = 0; i < decodeIndices.length; i++) {
|
||||
int missingBlkIdx = decodeIndices[i];
|
||||
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
|
||||
if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
|
||||
chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar functionality with {@link #divideByteRangeIntoStripes}, but is used
|
||||
* by stateful read and uses ByteBuffer as reading target buffer. Besides the
|
||||
@ -485,7 +387,7 @@ public class StripedBlockUtil {
|
||||
/**
|
||||
* Map the logical byte range to a set of inclusive {@link StripingCell}
|
||||
* instances, each representing the overlap of the byte range to a cell
|
||||
* used by {@link DFSStripedOutputStream} in encoding
|
||||
* used by {@link DFSStripedOutputStream} in encoding.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
private static StripingCell[] getStripingCellsOfByteRange(
|
||||
@ -530,7 +432,7 @@ public class StripedBlockUtil {
|
||||
int dataBlkNum = ecPolicy.getNumDataUnits();
|
||||
int parityBlkNum = ecPolicy.getNumParityUnits();
|
||||
|
||||
VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
|
||||
VerticalRange[] ranges = new VerticalRange[dataBlkNum + parityBlkNum];
|
||||
|
||||
long earliestStart = Long.MAX_VALUE;
|
||||
long latestEnd = -1;
|
||||
@ -675,7 +577,7 @@ public class StripedBlockUtil {
|
||||
@VisibleForTesting
|
||||
static class StripingCell {
|
||||
final ErasureCodingPolicy ecPolicy;
|
||||
/** Logical order in a block group, used when doing I/O to a block group */
|
||||
/** Logical order in a block group, used when doing I/O to a block group. */
|
||||
final int idxInBlkGroup;
|
||||
final int idxInInternalBlk;
|
||||
final int idxInStripe;
|
||||
@ -738,7 +640,7 @@ public class StripedBlockUtil {
|
||||
*/
|
||||
public static class AlignedStripe {
|
||||
public VerticalRange range;
|
||||
/** status of each chunk in the stripe */
|
||||
/** status of each chunk in the stripe. */
|
||||
public final StripingChunk[] chunks;
|
||||
public int fetchedChunksNum = 0;
|
||||
public int missingChunksNum = 0;
|
||||
@ -790,9 +692,9 @@ public class StripedBlockUtil {
|
||||
* +-----+
|
||||
*/
|
||||
public static class VerticalRange {
|
||||
/** start offset in the block group (inclusive) */
|
||||
/** start offset in the block group (inclusive). */
|
||||
public long offsetInBlock;
|
||||
/** length of the stripe range */
|
||||
/** length of the stripe range. */
|
||||
public long spanInBlock;
|
||||
|
||||
public VerticalRange(long offsetInBlock, long length) {
|
||||
@ -801,7 +703,7 @@ public class StripedBlockUtil {
|
||||
this.spanInBlock = length;
|
||||
}
|
||||
|
||||
/** whether a position is in the range */
|
||||
/** whether a position is in the range. */
|
||||
public boolean include(long pos) {
|
||||
return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
|
||||
}
|
||||
@ -915,7 +817,7 @@ public class StripedBlockUtil {
|
||||
/**
|
||||
* Note: target will be ready-to-read state after the call.
|
||||
*/
|
||||
void copyTo(ByteBuffer target) {
|
||||
public void copyTo(ByteBuffer target) {
|
||||
for (ByteBuffer slice : slices) {
|
||||
slice.flip();
|
||||
target.put(slice);
|
||||
@ -923,7 +825,7 @@ public class StripedBlockUtil {
|
||||
target.flip();
|
||||
}
|
||||
|
||||
void copyFrom(ByteBuffer src) {
|
||||
public void copyFrom(ByteBuffer src) {
|
||||
ByteBuffer tmp;
|
||||
int len;
|
||||
for (ByteBuffer slice : slices) {
|
||||
@ -970,6 +872,28 @@ public class StripedBlockUtil {
|
||||
}
|
||||
}
|
||||
|
||||
/** Used to indicate the buffered data's range in the block group. */
|
||||
public static class StripeRange {
|
||||
/** start offset in the block group (inclusive). */
|
||||
final long offsetInBlock;
|
||||
/** length of the stripe range. */
|
||||
final long length;
|
||||
|
||||
public StripeRange(long offsetInBlock, long length) {
|
||||
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
||||
this.offsetInBlock = offsetInBlock;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
public boolean include(long pos) {
|
||||
return pos >= offsetInBlock && pos < offsetInBlock + length;
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the information such as IDs and generation stamps in block-i
|
||||
* match the block group.
|
||||
|
@ -45,7 +45,9 @@ import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.crypto.key.kms.server.EagerKeyGeneratorKeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
@ -734,14 +736,33 @@ public class TestEncryptionZones {
|
||||
// Roll the key of the encryption zone
|
||||
assertNumZones(1);
|
||||
String keyName = dfsAdmin.listEncryptionZones().next().getKeyName();
|
||||
FileEncryptionInfo feInfo1 = getFileEncryptionInfo(encFile1);
|
||||
cluster.getNamesystem().getProvider().rollNewVersion(keyName);
|
||||
/**
|
||||
* due to the cache on the server side, client may get old keys.
|
||||
* @see EagerKeyGeneratorKeyProviderCryptoExtension#rollNewVersion(String)
|
||||
*/
|
||||
boolean rollSucceeded = false;
|
||||
for (int i = 0; i <= EagerKeyGeneratorKeyProviderCryptoExtension
|
||||
.KMS_KEY_CACHE_SIZE_DEFAULT + CommonConfigurationKeysPublic.
|
||||
KMS_CLIENT_ENC_KEY_CACHE_SIZE_DEFAULT; ++i) {
|
||||
KeyProviderCryptoExtension.EncryptedKeyVersion ekv2 =
|
||||
cluster.getNamesystem().getProvider().generateEncryptedKey(TEST_KEY);
|
||||
if (!(feInfo1.getEzKeyVersionName()
|
||||
.equals(ekv2.getEncryptionKeyVersionName()))) {
|
||||
rollSucceeded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("rollover did not generate a new key even after"
|
||||
+ " queue is drained", rollSucceeded);
|
||||
|
||||
// Read them back in and compare byte-by-byte
|
||||
verifyFilesEqual(fs, baseFile, encFile1, len);
|
||||
// Write a new enc file and validate
|
||||
final Path encFile2 = new Path(zone, "myfile2");
|
||||
DFSTestUtil.createFile(fs, encFile2, len, (short) 1, 0xFEED);
|
||||
// FEInfos should be different
|
||||
FileEncryptionInfo feInfo1 = getFileEncryptionInfo(encFile1);
|
||||
FileEncryptionInfo feInfo2 = getFileEncryptionInfo(encFile2);
|
||||
assertFalse("EDEKs should be different", Arrays
|
||||
.equals(feInfo1.getEncryptedDataEncryptionKey(),
|
||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
@ -349,7 +350,12 @@ public class TestBlockTokenWithDFS {
|
||||
Configuration conf = getConf(numDataNodes);
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
|
||||
// prefer non-ephemeral port to avoid port collision on restartNameNode
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nameNodePort(ServerSocketUtil.getPort(19820, 100))
|
||||
.nameNodeHttpPort(ServerSocketUtil.getPort(19870, 100))
|
||||
.numDataNodes(numDataNodes)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
assertEquals(numDataNodes, cluster.getDataNodes().size());
|
||||
doTestRead(conf, cluster, false);
|
||||
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
@ -59,7 +60,27 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
|
||||
@Override
|
||||
public void testRead() throws Exception {
|
||||
conf = getConf();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||
|
||||
/*
|
||||
* prefer non-ephemeral port to avoid conflict with tests using
|
||||
* ephemeral ports on MiniDFSCluster#restartDataNode(true).
|
||||
*/
|
||||
Configuration[] overlays = new Configuration[numDNs];
|
||||
for (int i = 0; i < overlays.length; i++) {
|
||||
int offset = i * 10;
|
||||
Configuration c = new Configuration();
|
||||
c.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:"
|
||||
+ ServerSocketUtil.getPort(19866 + offset, 100));
|
||||
c.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:"
|
||||
+ ServerSocketUtil.getPort(19867 + offset, 100));
|
||||
overlays[i] = c;
|
||||
}
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nameNodePort(ServerSocketUtil.getPort(19820, 100))
|
||||
.nameNodeHttpPort(ServerSocketUtil.getPort(19870, 100))
|
||||
.numDataNodes(numDNs)
|
||||
.build();
|
||||
cluster.getFileSystem().getClient()
|
||||
.setErasureCodingPolicy("/", null);
|
||||
try {
|
||||
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||
import org.apache.hadoop.hdfs.server.namenode.MockNameNodeResourceChecker;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.net.ServerSocketUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
|
||||
@ -75,14 +76,21 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||
0);
|
||||
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10023);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10024);
|
||||
|
||||
// Get random port numbers in advance. Because ZKFCs and DFSHAAdmin
|
||||
// needs rpc port numbers of all ZKFCs, Setting 0 does not work here.
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1",
|
||||
ServerSocketUtil.getPort(10023, 100));
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2",
|
||||
ServerSocketUtil.getPort(10024, 100));
|
||||
|
||||
// prefer non-ephemeral port to avoid port collision on restartNameNode
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10021))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10022)));
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1")
|
||||
.setIpcPort(ServerSocketUtil.getPort(10021, 100)))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2")
|
||||
.setIpcPort(ServerSocketUtil.getPort(10022, 100))));
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
|
@ -283,5 +283,4 @@ public class TestStripedBlockUtil {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -564,4 +564,14 @@
|
||||
</Or>
|
||||
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
|
||||
</Match>
|
||||
|
||||
<!-- Ignore VO_VOLATILE_INCREMENT, they will be protected by writeLock -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User" />
|
||||
<Or>
|
||||
<Field name="pendingApplications" />
|
||||
<Field name="activeApplications" />
|
||||
</Or>
|
||||
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
@ -35,6 +35,7 @@ import org.apache.hadoop.http.HtmlQuoting;
|
||||
import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
|
||||
import org.apache.hadoop.yarn.webapp.Router.Dest;
|
||||
import org.apache.hadoop.yarn.webapp.view.ErrorPage;
|
||||
import org.apache.hadoop.yarn.webapp.view.RobotsTextPage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -117,6 +118,14 @@ public class Dispatcher extends HttpServlet {
|
||||
}
|
||||
Controller.RequestContext rc =
|
||||
injector.getInstance(Controller.RequestContext.class);
|
||||
|
||||
//short-circuit robots.txt serving for all YARN webapps.
|
||||
if (uri.equals(RobotsTextPage.ROBOTS_TXT_PATH)) {
|
||||
rc.setStatus(HttpServletResponse.SC_FOUND);
|
||||
render(RobotsTextPage.class);
|
||||
return;
|
||||
}
|
||||
|
||||
if (setCookieParams(rc, req) > 0) {
|
||||
Cookie ec = rc.cookies().get(ERROR_COOKIE);
|
||||
if (ec != null) {
|
||||
|
@ -29,6 +29,7 @@ import java.util.Map;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.yarn.webapp.view.RobotsTextPage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -158,7 +159,8 @@ public abstract class WebApp extends ServletModule {
|
||||
public void configureServlets() {
|
||||
setup();
|
||||
|
||||
serve("/", "/__stop").with(Dispatcher.class);
|
||||
serve("/", "/__stop", RobotsTextPage.ROBOTS_TXT_PATH)
|
||||
.with(Dispatcher.class);
|
||||
|
||||
for (String path : this.servePathSpecs) {
|
||||
serve(path).with(Dispatcher.class);
|
||||
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* *
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* /
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.webapp.view;
|
||||
|
||||
/**
|
||||
* Simple class that renders a robot.txt page that disallows crawling.
|
||||
*/
|
||||
|
||||
public class RobotsTextPage extends TextPage {
|
||||
public static final String ROBOTS_TXT = "robots.txt";
|
||||
public static final String ROBOTS_TXT_PATH = "/" + ROBOTS_TXT;
|
||||
|
||||
static final String USER_AGENT_LINE = "User-agent: *";
|
||||
static final String DISALLOW_LINE = "Disallow: /";
|
||||
|
||||
@Override
|
||||
public void render() {
|
||||
putWithoutEscapeHtml(USER_AGENT_LINE);
|
||||
putWithoutEscapeHtml(DISALLOW_LINE);
|
||||
}
|
||||
}
|
@ -38,6 +38,7 @@ import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlPage;
|
||||
import org.apache.hadoop.yarn.webapp.view.JQueryUI;
|
||||
import org.apache.hadoop.yarn.webapp.view.RobotsTextPage;
|
||||
import org.apache.hadoop.yarn.webapp.view.TextPage;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
@ -260,6 +261,31 @@ public class TestWebApp {
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void testRobotsText() throws Exception {
|
||||
WebApp app =
|
||||
WebApps.$for("test", TestWebApp.class, this, "ws").start(new WebApp() {
|
||||
@Override
|
||||
public void setup() {
|
||||
bind(MyTestJAXBContextResolver.class);
|
||||
bind(MyTestWebService.class);
|
||||
}
|
||||
});
|
||||
String baseUrl = baseUrl(app);
|
||||
try {
|
||||
//using system line separator here since that is what
|
||||
// TextView (via PrintWriter) seems to use.
|
||||
String[] robotsTxtOutput = getContent(baseUrl +
|
||||
RobotsTextPage.ROBOTS_TXT).trim().split(System.getProperty("line"
|
||||
+ ".separator"));
|
||||
|
||||
assertEquals(2, robotsTxtOutput.length);
|
||||
assertEquals("User-agent: *", robotsTxtOutput[0]);
|
||||
assertEquals("Disallow: /", robotsTxtOutput[1]);
|
||||
} finally {
|
||||
app.stop();
|
||||
}
|
||||
}
|
||||
|
||||
// This is to test the GuiceFilter should only be applied to webAppContext,
|
||||
// not to logContext;
|
||||
@Test public void testYARNWebAppContext() throws Exception {
|
||||
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||
|
||||
@ -39,17 +38,6 @@ public class NoOverCommitPolicy implements SharingPolicy {
|
||||
public void validate(Plan plan, ReservationAllocation reservation)
|
||||
throws PlanningException {
|
||||
|
||||
ReservationAllocation oldReservation =
|
||||
plan.getReservationById(reservation.getReservationId());
|
||||
|
||||
// check updates are using same name
|
||||
if (oldReservation != null
|
||||
&& !oldReservation.getUser().equals(reservation.getUser())) {
|
||||
throw new MismatchedUserException(
|
||||
"Updating an existing reservation with mismatching user:"
|
||||
+ oldReservation.getUser() + " != " + reservation.getUser());
|
||||
}
|
||||
|
||||
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
|
||||
reservation.getUser(), reservation.getReservationId(),
|
||||
reservation.getStartTime(), reservation.getEndTime());
|
||||
|
@ -1,46 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
||||
/**
|
||||
* Exception thrown when an update to an existing reservation is performed
|
||||
* by a user that is not the reservation owner.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public class MismatchedUserException extends PlanningException {
|
||||
|
||||
private static final long serialVersionUID = 8313222590561668413L;
|
||||
|
||||
public MismatchedUserException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MismatchedUserException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public MismatchedUserException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
}
|
@ -24,6 +24,7 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -60,25 +61,25 @@ import com.google.common.collect.Sets;
|
||||
|
||||
public abstract class AbstractCSQueue implements CSQueue {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
|
||||
CSQueue parent;
|
||||
volatile CSQueue parent;
|
||||
final String queueName;
|
||||
volatile int numContainers;
|
||||
|
||||
final Resource minimumAllocation;
|
||||
volatile Resource maximumAllocation;
|
||||
QueueState state;
|
||||
volatile QueueState state;
|
||||
final CSQueueMetrics metrics;
|
||||
protected final PrivilegedEntity queueEntity;
|
||||
|
||||
final ResourceCalculator resourceCalculator;
|
||||
Set<String> accessibleLabels;
|
||||
RMNodeLabelsManager labelManager;
|
||||
final RMNodeLabelsManager labelManager;
|
||||
String defaultLabelExpression;
|
||||
|
||||
Map<AccessType, AccessControlList> acls =
|
||||
new HashMap<AccessType, AccessControlList>();
|
||||
volatile boolean reservationsContinueLooking;
|
||||
private boolean preemptionDisabled;
|
||||
private volatile boolean preemptionDisabled;
|
||||
|
||||
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
||||
volatile ResourceUsage queueUsage;
|
||||
@ -94,6 +95,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
|
||||
protected ActivitiesManager activitiesManager;
|
||||
|
||||
protected ReentrantReadWriteLock.ReadLock readLock;
|
||||
protected ReentrantReadWriteLock.WriteLock writeLock;
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
||||
@ -116,7 +120,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
|
||||
|
||||
// initialize QueueCapacities
|
||||
queueCapacities = new QueueCapacities(parent == null);
|
||||
queueCapacities = new QueueCapacities(parent == null);
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
}
|
||||
|
||||
protected void setupConfigurableCapacities() {
|
||||
@ -128,12 +136,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized float getCapacity() {
|
||||
public float getCapacity() {
|
||||
return queueCapacities.getCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized float getAbsoluteCapacity() {
|
||||
public float getAbsoluteCapacity() {
|
||||
return queueCapacities.getAbsoluteCapacity();
|
||||
}
|
||||
|
||||
@ -167,7 +175,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized QueueState getState() {
|
||||
public QueueState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@ -187,13 +195,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CSQueue getParent() {
|
||||
public CSQueue getParent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setParent(CSQueue newParentQueue) {
|
||||
this.parent = (ParentQueue)newParentQueue;
|
||||
public void setParent(CSQueue newParentQueue) {
|
||||
this.parent = newParentQueue;
|
||||
}
|
||||
|
||||
public Set<String> getAccessibleNodeLabels() {
|
||||
@ -221,18 +229,22 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
* Set maximum capacity - used only for testing.
|
||||
* @param maximumCapacity new max capacity
|
||||
*/
|
||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(),
|
||||
queueCapacities.getCapacity(), maximumCapacity);
|
||||
float absMaxCapacity =
|
||||
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
|
||||
queueCapacities.getAbsoluteCapacity(),
|
||||
absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
|
||||
void setMaxCapacity(float maximumCapacity) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(),
|
||||
queueCapacities.getCapacity(), maximumCapacity);
|
||||
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
|
||||
maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
|
||||
queueCapacities.getAbsoluteCapacity(), absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -240,70 +252,82 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
return defaultLabelExpression;
|
||||
}
|
||||
|
||||
synchronized void setupQueueConfigs(Resource clusterResource)
|
||||
void setupQueueConfigs(Resource clusterResource)
|
||||
throws IOException {
|
||||
// get labels
|
||||
this.accessibleLabels =
|
||||
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression = csContext.getConfiguration()
|
||||
.getDefaultNodeLabelExpression(getQueuePath());
|
||||
try {
|
||||
writeLock.lock();
|
||||
// get labels
|
||||
this.accessibleLabels =
|
||||
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression =
|
||||
csContext.getConfiguration().getDefaultNodeLabelExpression(
|
||||
getQueuePath());
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
// inherit from parent if labels not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
|
||||
// After we setup labels, we can setup capacities
|
||||
setupConfigurableCapacities();
|
||||
|
||||
this.maximumAllocation =
|
||||
csContext.getConfiguration().getMaximumAllocationPerQueue(
|
||||
getQueuePath());
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
|
||||
|
||||
this.state = csContext.getConfiguration().getState(getQueuePath());
|
||||
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
&& this.accessibleLabels.containsAll(
|
||||
parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null
|
||||
&& !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else {
|
||||
Set<String> diff =
|
||||
Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException("Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=["
|
||||
+ StringUtils.join(diff, ",") + "]");
|
||||
// After we setup labels, we can setup capacities
|
||||
setupConfigurableCapacities();
|
||||
|
||||
this.maximumAllocation =
|
||||
csContext.getConfiguration().getMaximumAllocationPerQueue(
|
||||
getQueuePath());
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
|
||||
|
||||
this.state = csContext.getConfiguration().getState(getQueuePath());
|
||||
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null && !parent
|
||||
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(
|
||||
RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else{
|
||||
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=[" + StringUtils
|
||||
.join(diff, ",") + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.reservationsContinueLooking =
|
||||
csContext.getConfiguration().getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
this.reservationsContinueLooking = csContext.getConfiguration()
|
||||
.getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
}
|
||||
|
||||
|
||||
protected QueueInfo getQueueInfo() {
|
||||
// Deliberately doesn't use lock here, because this method will be invoked
|
||||
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
|
||||
// consistency here.
|
||||
// TODO, improve this
|
||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queueInfo.setQueueName(queueName);
|
||||
queueInfo.setAccessibleNodeLabels(accessibleLabels);
|
||||
@ -318,8 +342,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
}
|
||||
|
||||
public QueueStatistics getQueueStatistics() {
|
||||
QueueStatistics stats =
|
||||
recordFactory.newRecordInstance(QueueStatistics.class);
|
||||
// Deliberately doesn't use lock here, because this method will be invoked
|
||||
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
|
||||
// consistency here.
|
||||
// TODO, improve this
|
||||
QueueStatistics stats = recordFactory.newRecordInstance(
|
||||
QueueStatistics.class);
|
||||
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
|
||||
stats.setNumAppsRunning(getMetrics().getAppsRunning());
|
||||
stats.setNumAppsPending(getMetrics().getAppsPending());
|
||||
@ -351,26 +379,36 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
return minimumAllocation;
|
||||
}
|
||||
|
||||
synchronized void allocateResource(Resource clusterResource,
|
||||
void allocateResource(Resource clusterResource,
|
||||
Resource resource, String nodePartition, boolean changeContainerResource) {
|
||||
queueUsage.incUsed(nodePartition, resource);
|
||||
try {
|
||||
writeLock.lock();
|
||||
queueUsage.incUsed(nodePartition, resource);
|
||||
|
||||
if (!changeContainerResource) {
|
||||
++numContainers;
|
||||
if (!changeContainerResource) {
|
||||
++numContainers;
|
||||
}
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
}
|
||||
|
||||
protected synchronized void releaseResource(Resource clusterResource,
|
||||
protected void releaseResource(Resource clusterResource,
|
||||
Resource resource, String nodePartition, boolean changeContainerResource) {
|
||||
queueUsage.decUsed(nodePartition, resource);
|
||||
try {
|
||||
writeLock.lock();
|
||||
queueUsage.decUsed(nodePartition, resource);
|
||||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, nodePartition);
|
||||
|
||||
if (!changeContainerResource) {
|
||||
--numContainers;
|
||||
if (!changeContainerResource) {
|
||||
--numContainers;
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -381,7 +419,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
|
||||
@Private
|
||||
public Map<AccessType, AccessControlList> getACLs() {
|
||||
return acls;
|
||||
try {
|
||||
readLock.lock();
|
||||
return acls;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Private
|
||||
@ -464,86 +508,88 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
minimumAllocation);
|
||||
}
|
||||
|
||||
synchronized boolean canAssignToThisQueue(Resource clusterResource,
|
||||
boolean canAssignToThisQueue(Resource clusterResource,
|
||||
String nodePartition, ResourceLimits currentResourceLimits,
|
||||
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
|
||||
// Get current limited resource:
|
||||
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
|
||||
// queues' max capacity.
|
||||
// - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
|
||||
// queue's max capacity, queue's max capacity on the partition will be
|
||||
// considered to be 100%. Which is a queue can use all resource in the
|
||||
// partition.
|
||||
// Doing this because: for non-exclusive allocation, we make sure there's
|
||||
// idle resource on the partition, to avoid wastage, such resource will be
|
||||
// leveraged as much as we can, and preemption policy will reclaim it back
|
||||
// when partitoned-resource-request comes back.
|
||||
Resource currentLimitResource =
|
||||
getCurrentLimitResource(nodePartition, clusterResource,
|
||||
currentResourceLimits, schedulingMode);
|
||||
try {
|
||||
readLock.lock();
|
||||
// Get current limited resource:
|
||||
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
|
||||
// queues' max capacity.
|
||||
// - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
|
||||
// queue's max capacity, queue's max capacity on the partition will be
|
||||
// considered to be 100%. Which is a queue can use all resource in the
|
||||
// partition.
|
||||
// Doing this because: for non-exclusive allocation, we make sure there's
|
||||
// idle resource on the partition, to avoid wastage, such resource will be
|
||||
// leveraged as much as we can, and preemption policy will reclaim it back
|
||||
// when partitoned-resource-request comes back.
|
||||
Resource currentLimitResource = getCurrentLimitResource(nodePartition,
|
||||
clusterResource, currentResourceLimits, schedulingMode);
|
||||
|
||||
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
|
||||
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
|
||||
|
||||
// Set headroom for currentResourceLimits:
|
||||
// When queue is a parent queue: Headroom = limit - used + killable
|
||||
// When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
|
||||
Resource usedExceptKillable = nowTotalUsed;
|
||||
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
|
||||
usedExceptKillable = Resources.subtract(nowTotalUsed,
|
||||
getTotalKillableResource(nodePartition));
|
||||
}
|
||||
currentResourceLimits.setHeadroom(
|
||||
Resources.subtract(currentLimitResource, usedExceptKillable));
|
||||
// Set headroom for currentResourceLimits:
|
||||
// When queue is a parent queue: Headroom = limit - used + killable
|
||||
// When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself)
|
||||
Resource usedExceptKillable = nowTotalUsed;
|
||||
if (null != getChildQueues() && !getChildQueues().isEmpty()) {
|
||||
usedExceptKillable = Resources.subtract(nowTotalUsed,
|
||||
getTotalKillableResource(nodePartition));
|
||||
}
|
||||
currentResourceLimits.setHeadroom(
|
||||
Resources.subtract(currentLimitResource, usedExceptKillable));
|
||||
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
usedExceptKillable, currentLimitResource)) {
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
usedExceptKillable, currentLimitResource)) {
|
||||
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking
|
||||
&& nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
|
||||
&& Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
resourceCouldBeUnreserved, Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource =
|
||||
Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved);
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking && nodePartition.equals(
|
||||
RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
|
||||
resourceCalculator, clusterResource, resourceCouldBeUnreserved,
|
||||
Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource = Resources.subtract(
|
||||
usedExceptKillable, resourceCouldBeUnreserved);
|
||||
|
||||
// when total-used-without-reserved-resource < currentLimit, we still
|
||||
// have chance to allocate on this node by unreserving some containers
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to use reserved: " + getQueueName()
|
||||
+ " usedResources: " + queueUsage.getUsed()
|
||||
+ ", clusterResources: " + clusterResource
|
||||
+ ", reservedResources: " + resourceCouldBeUnreserved
|
||||
+ ", capacity-without-reserved: "
|
||||
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
|
||||
+ currentLimitResource);
|
||||
// when total-used-without-reserved-resource < currentLimit, we still
|
||||
// have chance to allocate on this node by unreserving some containers
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"try to use reserved: " + getQueueName() + " usedResources: "
|
||||
+ queueUsage.getUsed() + ", clusterResources: "
|
||||
+ clusterResource + ", reservedResources: "
|
||||
+ resourceCouldBeUnreserved
|
||||
+ ", capacity-without-reserved: "
|
||||
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
|
||||
+ currentLimitResource);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName() + "Check assign to queue, nodePartition="
|
||||
+ nodePartition + " usedResources: " + queueUsage
|
||||
.getUsed(nodePartition) + " clusterResources: " + clusterResource
|
||||
+ " currentUsedCapacity " + Resources
|
||||
.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(nodePartition), labelManager
|
||||
.getResourceByLabel(nodePartition, clusterResource))
|
||||
+ " max-capacity: " + queueCapacities
|
||||
.getAbsoluteMaximumCapacity(nodePartition) + ")");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName()
|
||||
+ "Check assign to queue, nodePartition="
|
||||
+ nodePartition
|
||||
+ " usedResources: "
|
||||
+ queueUsage.getUsed(nodePartition)
|
||||
+ " clusterResources: "
|
||||
+ clusterResource
|
||||
+ " currentUsedCapacity "
|
||||
+ Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(nodePartition),
|
||||
labelManager.getResourceByLabel(nodePartition, clusterResource))
|
||||
+ " max-capacity: "
|
||||
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
|
||||
}
|
||||
return false;
|
||||
return true;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -79,76 +79,98 @@ public class PlanQueue extends ParentQueue {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||
public void reinitialize(CSQueue newlyParsedQueue,
|
||||
Resource clusterResource) throws IOException {
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof PlanQueue)
|
||||
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||
+ " from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException(
|
||||
"Trying to reinitialize " + getQueuePath() + " from "
|
||||
+ newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
|
||||
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
|
||||
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
|
||||
|
||||
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
|
||||
throw new IOException(
|
||||
"Reservable Queue should not have sub-queues in the"
|
||||
+ "configuration");
|
||||
}
|
||||
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
|
||||
throw new IOException(
|
||||
"Reservable Queue should not have sub-queues in the"
|
||||
+ "configuration");
|
||||
}
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource);
|
||||
|
||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||
newlyParsedParentQueue.userLimitFactor,
|
||||
newlyParsedParentQueue.maxAppsForReservation,
|
||||
newlyParsedParentQueue.maxAppsPerUserForReservation);
|
||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||
newlyParsedParentQueue.userLimitFactor,
|
||||
newlyParsedParentQueue.maxAppsForReservation,
|
||||
newlyParsedParentQueue.maxAppsPerUserForReservation);
|
||||
|
||||
// run reinitialize on each existing queue, to trigger absolute cap
|
||||
// recomputations
|
||||
for (CSQueue res : this.getChildQueues()) {
|
||||
res.reinitialize(res, clusterResource);
|
||||
}
|
||||
showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
|
||||
}
|
||||
|
||||
synchronized void addChildQueue(CSQueue newQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
if (newQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException("Queue " + newQueue
|
||||
+ " being added has non zero capacity.");
|
||||
}
|
||||
boolean added = this.childQueues.add(newQueue);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updateChildQueues (action: add queue): " + added + " "
|
||||
+ getChildQueuesToPrint());
|
||||
// run reinitialize on each existing queue, to trigger absolute cap
|
||||
// recomputations
|
||||
for (CSQueue res : this.getChildQueues()) {
|
||||
res.reinitialize(res, clusterResource);
|
||||
}
|
||||
showReservationsAsQueues =
|
||||
newlyParsedParentQueue.showReservationsAsQueues;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeChildQueue(CSQueue remQueue)
|
||||
void addChildQueue(CSQueue newQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
if (remQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException("Queue " + remQueue
|
||||
+ " being removed has non zero capacity.");
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (newQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Queue " + newQueue + " being added has non zero capacity.");
|
||||
}
|
||||
boolean added = this.childQueues.add(newQueue);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updateChildQueues (action: add queue): " + added + " "
|
||||
+ getChildQueuesToPrint());
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
Iterator<CSQueue> qiter = childQueues.iterator();
|
||||
while (qiter.hasNext()) {
|
||||
CSQueue cs = qiter.next();
|
||||
if (cs.equals(remQueue)) {
|
||||
qiter.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed child queue: {}", cs.getQueueName());
|
||||
}
|
||||
|
||||
void removeChildQueue(CSQueue remQueue)
|
||||
throws SchedulerDynamicEditException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (remQueue.getCapacity() > 0) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Queue " + remQueue + " being removed has non zero capacity.");
|
||||
}
|
||||
Iterator<CSQueue> qiter = childQueues.iterator();
|
||||
while (qiter.hasNext()) {
|
||||
CSQueue cs = qiter.next();
|
||||
if (cs.equals(remQueue)) {
|
||||
qiter.remove();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removed child queue: {}", cs.getQueueName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized float sumOfChildCapacities() {
|
||||
float ret = 0;
|
||||
for (CSQueue l : childQueues) {
|
||||
ret += l.getCapacity();
|
||||
protected float sumOfChildCapacities() {
|
||||
try {
|
||||
writeLock.lock();
|
||||
float ret = 0;
|
||||
for (CSQueue l : childQueues) {
|
||||
ret += l.getCapacity();
|
||||
}
|
||||
return ret;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void updateQuotas(int userLimit, float userLimitFactor,
|
||||
|
@ -51,22 +51,28 @@ public class ReservationQueue extends LeafQueue {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue newlyParsedQueue,
|
||||
public void reinitialize(CSQueue newlyParsedQueue,
|
||||
Resource clusterResource) throws IOException {
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof ReservationQueue)
|
||||
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath()
|
||||
+ " from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
try {
|
||||
writeLock.lock();
|
||||
// Sanity check
|
||||
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException(
|
||||
"Trying to reinitialize " + getQueuePath() + " from "
|
||||
+ newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
minimumAllocation, this, labelManager, null);
|
||||
|
||||
updateQuotas(parent.getUserLimitForReservation(),
|
||||
parent.getUserLimitFactor(),
|
||||
parent.getMaxApplicationsForReservations(),
|
||||
parent.getMaxApplicationsPerUserForReservation());
|
||||
updateQuotas(parent.getUserLimitForReservation(),
|
||||
parent.getUserLimitFactor(),
|
||||
parent.getMaxApplicationsForReservations(),
|
||||
parent.getMaxApplicationsPerUserForReservation());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -77,21 +83,26 @@ public class ReservationQueue extends LeafQueue {
|
||||
* maxCapacity, etc..)
|
||||
* @throws SchedulerDynamicEditException
|
||||
*/
|
||||
public synchronized void setEntitlement(QueueEntitlement entitlement)
|
||||
public void setEntitlement(QueueEntitlement entitlement)
|
||||
throws SchedulerDynamicEditException {
|
||||
float capacity = entitlement.getCapacity();
|
||||
if (capacity < 0 || capacity > 1.0f) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Capacity demand is not in the [0,1] range: " + capacity);
|
||||
}
|
||||
setCapacity(capacity);
|
||||
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
|
||||
// note: we currently set maxCapacity to capacity
|
||||
// this might be revised later
|
||||
setMaxCapacity(entitlement.getMaxCapacity());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("successfully changed to " + capacity + " for queue "
|
||||
+ this.getQueueName());
|
||||
try {
|
||||
writeLock.lock();
|
||||
float capacity = entitlement.getCapacity();
|
||||
if (capacity < 0 || capacity > 1.0f) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Capacity demand is not in the [0,1] range: " + capacity);
|
||||
}
|
||||
setCapacity(capacity);
|
||||
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
|
||||
// note: we currently set maxCapacity to capacity
|
||||
// this might be revised later
|
||||
setMaxCapacity(entitlement.getMaxCapacity());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("successfully changed to " + capacity + " for queue " + this
|
||||
.getQueueName());
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,7 +328,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||
* of the resources that will be allocated to and preempted from this
|
||||
* application.
|
||||
*
|
||||
* @param rc
|
||||
* @param resourceCalculator
|
||||
* @param clusterResource
|
||||
* @param minimumAllocation
|
||||
* @return an allocation
|
||||
|
@ -566,6 +566,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <defaultQueueSchedulingPolicy>drf" +
|
||||
"</defaultQueueSchedulingPolicy>");
|
||||
out.println(" <queue name=\"queueA\">");
|
||||
out.println(" <aclSubmitReservations>" +
|
||||
"queueA_user,common_user " +
|
||||
|
@ -23,10 +23,8 @@ import static org.mockito.Mockito.mock;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
@ -127,25 +125,6 @@ public class TestNoOverCommitPolicy {
|
||||
.generateAllocation(initTime, step, f), res, minAlloc), false);
|
||||
}
|
||||
|
||||
@Test(expected = MismatchedUserException.class)
|
||||
public void testUserMismatch() throws IOException, PlanningException {
|
||||
// generate allocation from single tenant that exceed capacity
|
||||
int[] f = generateData(3600, (int) (0.5 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
|
||||
|
||||
plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||
.generateAllocation(initTime, step, f), res, minAlloc), false);
|
||||
|
||||
// trying to update a reservation with a mismatching user
|
||||
plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
|
||||
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||
|
@ -828,8 +828,8 @@ public class TestContainerResizing {
|
||||
app.getAppAttemptResourceUsage().getPending().getMemorySize());
|
||||
// Queue/user/application's usage will be updated
|
||||
checkUsedResource(rm1, "default", 0 * GB, null);
|
||||
Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
|
||||
.getUser("user").getUsed().getMemorySize());
|
||||
// User will be removed
|
||||
Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user"));
|
||||
Assert.assertEquals(0 * GB,
|
||||
app.getAppAttemptResourceUsage().getReserved().getMemorySize());
|
||||
Assert.assertEquals(0 * GB,
|
||||
|
@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||
@ -205,6 +207,9 @@ public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
|
||||
Resource r = Resource.newInstance(1024, 1);
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
MockRMApp m = new MockRMApp(appId.getId(), appId.getClusterTimestamp(),
|
||||
RMAppState.NEW);
|
||||
yarnCluster.getResourceManager().getRMContext().getRMApps().put(appId, m);
|
||||
ApplicationAttemptId validAppAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user