HDFS-6735. A minor optimization to avoid pread() be blocked by read() inside the same DFSInputStream (Lars Hofhansl via stack)
This commit is contained in:
parent
92ce6eda92
commit
7caa3bc98e
@ -422,6 +422,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7446. HDFS inotify should have the ability to determine what txid it
|
||||
has read up to (cmccabe)
|
||||
|
||||
HDFS-6735. A minor optimization to avoid pread() be blocked by read()
|
||||
inside the same DFSInputStream (Lars Hofhansl via stack)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -207,4 +207,13 @@
|
||||
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
|
||||
</Match>
|
||||
|
||||
<!--
|
||||
We use a separate lock to guard cachingStrategy in order to separate
|
||||
locks for p-reads from seek + read invocations.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
|
||||
<Field name="cachingStrategy" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
@ -92,17 +92,32 @@ public class DFSInputStream extends FSInputStream
|
||||
private final DFSClient dfsClient;
|
||||
private boolean closed = false;
|
||||
private final String src;
|
||||
private BlockReader blockReader = null;
|
||||
private final boolean verifyChecksum;
|
||||
private LocatedBlocks locatedBlocks = null;
|
||||
private long lastBlockBeingWrittenLength = 0;
|
||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
||||
|
||||
// state by stateful read only:
|
||||
// (protected by lock on this)
|
||||
/////
|
||||
private DatanodeInfo currentNode = null;
|
||||
private LocatedBlock currentLocatedBlock = null;
|
||||
private long pos = 0;
|
||||
private long blockEnd = -1;
|
||||
private BlockReader blockReader = null;
|
||||
////
|
||||
|
||||
// state shared by stateful and positional read:
|
||||
// (protected by lock on infoLock)
|
||||
////
|
||||
private LocatedBlocks locatedBlocks = null;
|
||||
private long lastBlockBeingWrittenLength = 0;
|
||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
||||
private CachingStrategy cachingStrategy;
|
||||
////
|
||||
|
||||
private final ReadStatistics readStatistics = new ReadStatistics();
|
||||
// lock for state shared between read and pread
|
||||
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
|
||||
// (it's OK to acquire this lock when the lock on <this> is held)
|
||||
private final Object infoLock = new Object();
|
||||
|
||||
/**
|
||||
* Track the ByteBuffers that we have handed out to readers.
|
||||
@ -226,15 +241,17 @@ void addToDeadNodes(DatanodeInfo dnInfo) {
|
||||
this.dfsClient = dfsClient;
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
this.src = src;
|
||||
this.cachingStrategy =
|
||||
dfsClient.getDefaultReadCachingStrategy();
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
|
||||
}
|
||||
openInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* Grab the open-file info from namenode
|
||||
*/
|
||||
synchronized void openInfo() throws IOException, UnresolvedLinkException {
|
||||
void openInfo() throws IOException, UnresolvedLinkException {
|
||||
synchronized(infoLock) {
|
||||
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
|
||||
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
|
||||
while (retriesForLastBlockLength > 0) {
|
||||
@ -257,6 +274,7 @@ synchronized void openInfo() throws IOException, UnresolvedLinkException {
|
||||
throw new IOException("Could not obtain the last block locations.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitFor(int waitTime) throws IOException {
|
||||
try {
|
||||
@ -306,7 +324,6 @@ private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
|
||||
|
||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||
|
||||
currentNode = null;
|
||||
return lastBlockBeingWrittenLength;
|
||||
}
|
||||
|
||||
@ -359,21 +376,25 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
|
||||
throw new IOException("Cannot obtain block length for " + locatedblock);
|
||||
}
|
||||
|
||||
public synchronized long getFileLength() {
|
||||
public long getFileLength() {
|
||||
synchronized(infoLock) {
|
||||
return locatedBlocks == null? 0:
|
||||
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
|
||||
}
|
||||
}
|
||||
|
||||
// Short circuit local reads are forbidden for files that are
|
||||
// under construction. See HDFS-2757.
|
||||
synchronized boolean shortCircuitForbidden() {
|
||||
boolean shortCircuitForbidden() {
|
||||
synchronized(infoLock) {
|
||||
return locatedBlocks.isUnderConstruction();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the datanode from which the stream is currently reading.
|
||||
*/
|
||||
public DatanodeInfo getCurrentDatanode() {
|
||||
public synchronized DatanodeInfo getCurrentDatanode() {
|
||||
return currentNode;
|
||||
}
|
||||
|
||||
@ -403,8 +424,9 @@ public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
|
||||
* @return located block
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized LocatedBlock getBlockAt(long offset,
|
||||
private LocatedBlock getBlockAt(long offset,
|
||||
boolean updatePosition) throws IOException {
|
||||
synchronized(infoLock) {
|
||||
assert (locatedBlocks != null) : "locatedBlocks is null";
|
||||
|
||||
final LocatedBlock blk;
|
||||
@ -437,15 +459,21 @@ else if (offset >= locatedBlocks.getFileLength()) {
|
||||
|
||||
// update current position
|
||||
if (updatePosition) {
|
||||
// synchronized not strictly needed, since we only get here
|
||||
// from synchronized caller methods
|
||||
synchronized(this) {
|
||||
pos = offset;
|
||||
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
|
||||
currentLocatedBlock = blk;
|
||||
}
|
||||
}
|
||||
return blk;
|
||||
}
|
||||
}
|
||||
|
||||
/** Fetch a block from namenode and cache it */
|
||||
private synchronized void fetchBlockAt(long offset) throws IOException {
|
||||
private void fetchBlockAt(long offset) throws IOException {
|
||||
synchronized(infoLock) {
|
||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
||||
if (targetBlockIdx < 0) { // block is not cached
|
||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
||||
@ -457,6 +485,7 @@ private synchronized void fetchBlockAt(long offset) throws IOException {
|
||||
}
|
||||
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get blocks in the specified range.
|
||||
@ -467,7 +496,7 @@ private synchronized void fetchBlockAt(long offset) throws IOException {
|
||||
* @return consequent segment of located blocks
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized List<LocatedBlock> getBlockRange(long offset,
|
||||
private List<LocatedBlock> getBlockRange(long offset,
|
||||
long length) throws IOException {
|
||||
// getFileLength(): returns total file length
|
||||
// locatedBlocks.getFileLength(): returns length of completed blocks
|
||||
@ -475,7 +504,7 @@ private synchronized List<LocatedBlock> getBlockRange(long offset,
|
||||
throw new IOException("Offset: " + offset +
|
||||
" exceeds file length: " + getFileLength());
|
||||
}
|
||||
|
||||
synchronized(infoLock) {
|
||||
final List<LocatedBlock> blocks;
|
||||
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
|
||||
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
|
||||
@ -496,14 +525,16 @@ private synchronized List<LocatedBlock> getBlockRange(long offset,
|
||||
|
||||
return blocks;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get blocks in the specified range.
|
||||
* Includes only the complete blocks.
|
||||
* Fetch them from the namenode if not cached.
|
||||
*/
|
||||
private synchronized List<LocatedBlock> getFinalizedBlockRange(
|
||||
private List<LocatedBlock> getFinalizedBlockRange(
|
||||
long offset, long length) throws IOException {
|
||||
synchronized(infoLock) {
|
||||
assert (locatedBlocks != null) : "locatedBlocks is null";
|
||||
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
|
||||
// search cached blocks first
|
||||
@ -532,6 +563,7 @@ private synchronized List<LocatedBlock> getFinalizedBlockRange(
|
||||
}
|
||||
return blockRange;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a DataInputStream to a DataNode so that it can be read from.
|
||||
@ -573,6 +605,12 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
|
||||
try {
|
||||
ExtendedBlock blk = targetBlock.getBlock();
|
||||
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
|
||||
CachingStrategy curCachingStrategy;
|
||||
boolean shortCircuitForbidden;
|
||||
synchronized(infoLock) {
|
||||
curCachingStrategy = cachingStrategy;
|
||||
shortCircuitForbidden = shortCircuitForbidden();
|
||||
}
|
||||
blockReader = new BlockReaderFactory(dfsClient.getConf()).
|
||||
setInetSocketAddress(targetAddr).
|
||||
setRemotePeerFactory(dfsClient).
|
||||
@ -585,8 +623,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
|
||||
setVerifyChecksum(verifyChecksum).
|
||||
setClientName(dfsClient.clientName).
|
||||
setLength(blk.getNumBytes() - offsetIntoBlock).
|
||||
setCachingStrategy(cachingStrategy).
|
||||
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
|
||||
setCachingStrategy(curCachingStrategy).
|
||||
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
|
||||
setClientCacheContext(dfsClient.getClientContext()).
|
||||
setUserGroupInformation(dfsClient.ugi).
|
||||
setConfiguration(dfsClient.getConfiguration()).
|
||||
@ -782,7 +820,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
|
||||
}
|
||||
}
|
||||
|
||||
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
||||
private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
||||
dfsClient.checkOpen();
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
@ -800,10 +838,12 @@ private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws I
|
||||
currentNode = blockSeekTo(pos);
|
||||
}
|
||||
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
||||
synchronized(infoLock) {
|
||||
if (locatedBlocks.isLastBlockComplete()) {
|
||||
realLen = (int) Math.min(realLen,
|
||||
locatedBlocks.getFileLength() - pos);
|
||||
}
|
||||
}
|
||||
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
|
||||
|
||||
if (result >= 0) {
|
||||
@ -1055,8 +1095,8 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
||||
// start of the loop.
|
||||
CachingStrategy curCachingStrategy;
|
||||
boolean allowShortCircuitLocalReads;
|
||||
synchronized (this) {
|
||||
block = getBlockAt(block.getStartOffset(), false);
|
||||
synchronized(infoLock) {
|
||||
curCachingStrategy = cachingStrategy;
|
||||
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
||||
}
|
||||
@ -1488,7 +1528,7 @@ public synchronized void seek(long targetPos) throws IOException {
|
||||
* Same as {@link #seekToNewSource(long)} except that it does not exclude
|
||||
* the current datanode and might connect to the same node.
|
||||
*/
|
||||
private synchronized boolean seekToBlockSource(long targetPos)
|
||||
private boolean seekToBlockSource(long targetPos)
|
||||
throws IOException {
|
||||
currentNode = blockSeekTo(targetPos);
|
||||
return true;
|
||||
@ -1575,11 +1615,13 @@ public synchronized ReadStatistics getReadStatistics() {
|
||||
return new ReadStatistics(readStatistics);
|
||||
}
|
||||
|
||||
public synchronized FileEncryptionInfo getFileEncryptionInfo() {
|
||||
public FileEncryptionInfo getFileEncryptionInfo() {
|
||||
synchronized(infoLock) {
|
||||
return fileEncryptionInfo;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void closeCurrentBlockReader() {
|
||||
private void closeCurrentBlockReader() {
|
||||
if (blockReader == null) return;
|
||||
// Close the current block reader so that the new caching settings can
|
||||
// take effect immediately.
|
||||
@ -1594,18 +1636,20 @@ private synchronized void closeCurrentBlockReader() {
|
||||
@Override
|
||||
public synchronized void setReadahead(Long readahead)
|
||||
throws IOException {
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy =
|
||||
new CachingStrategy.Builder(this.cachingStrategy).
|
||||
setReadahead(readahead).build();
|
||||
new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
|
||||
}
|
||||
closeCurrentBlockReader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setDropBehind(Boolean dropBehind)
|
||||
throws IOException {
|
||||
synchronized (infoLock) {
|
||||
this.cachingStrategy =
|
||||
new CachingStrategy.Builder(this.cachingStrategy).
|
||||
setDropBehind(dropBehind).build();
|
||||
new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
|
||||
}
|
||||
closeCurrentBlockReader();
|
||||
}
|
||||
|
||||
|
@ -34,14 +34,17 @@ public class LocatedBlocks {
|
||||
private final long fileLength;
|
||||
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
|
||||
private final boolean underConstruction;
|
||||
private LocatedBlock lastLocatedBlock = null;
|
||||
private boolean isLastBlockComplete = false;
|
||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
||||
private final LocatedBlock lastLocatedBlock;
|
||||
private final boolean isLastBlockComplete;
|
||||
private final FileEncryptionInfo fileEncryptionInfo;
|
||||
|
||||
public LocatedBlocks() {
|
||||
fileLength = 0;
|
||||
blocks = null;
|
||||
underConstruction = false;
|
||||
lastLocatedBlock = null;
|
||||
isLastBlockComplete = false;
|
||||
fileEncryptionInfo = null;
|
||||
}
|
||||
|
||||
public LocatedBlocks(long flength, boolean isUnderConstuction,
|
||||
|
@ -400,7 +400,7 @@ private void ref(ShortCircuitReplica replica) {
|
||||
lock.lock();
|
||||
try {
|
||||
Preconditions.checkArgument(replica.refCount > 0,
|
||||
"can't ref " + replica + " because its refCount reached " +
|
||||
"can't ref %s because its refCount reached %d", replica,
|
||||
replica.refCount);
|
||||
Long evictableTimeNs = replica.getEvictableTimeNs();
|
||||
replica.refCount++;
|
||||
@ -456,14 +456,13 @@ void unref(ShortCircuitReplica replica) {
|
||||
if (newRefCount == 0) {
|
||||
// Close replica, since there are no remaining references to it.
|
||||
Preconditions.checkArgument(replica.purged,
|
||||
"Replica " + replica + " reached a refCount of 0 without " +
|
||||
"being purged");
|
||||
"Replica %s reached a refCount of 0 without being purged", replica);
|
||||
replica.close();
|
||||
} else if (newRefCount == 1) {
|
||||
Preconditions.checkState(null == replica.getEvictableTimeNs(),
|
||||
"Replica " + replica + " had a refCount higher than 1, " +
|
||||
"but was still evictable (evictableTimeNs = " +
|
||||
replica.getEvictableTimeNs() + ")");
|
||||
"Replica %s had a refCount higher than 1, " +
|
||||
"but was still evictable (evictableTimeNs = %d)",
|
||||
replica, replica.getEvictableTimeNs());
|
||||
if (!replica.purged) {
|
||||
// Add the replica to the end of an eviction list.
|
||||
// Eviction lists are sorted by time.
|
||||
@ -478,8 +477,8 @@ void unref(ShortCircuitReplica replica) {
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkArgument(replica.refCount >= 0,
|
||||
"replica's refCount went negative (refCount = " +
|
||||
replica.refCount + " for " + replica + ")");
|
||||
"replica's refCount went negative (refCount = %d" +
|
||||
" for %s)", replica.refCount, replica);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": unref replica " + replica +
|
||||
@ -602,7 +601,7 @@ private void removeEvictable(ShortCircuitReplica replica,
|
||||
Preconditions.checkNotNull(evictableTimeNs);
|
||||
ShortCircuitReplica removed = map.remove(evictableTimeNs);
|
||||
Preconditions.checkState(removed == replica,
|
||||
"failed to make " + replica + " unevictable");
|
||||
"failed to make %s unevictable", replica);
|
||||
replica.setEvictableTimeNs(null);
|
||||
}
|
||||
|
||||
@ -859,7 +858,7 @@ ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
|
||||
Condition cond = (Condition)replica.mmapData;
|
||||
cond.awaitUninterruptibly();
|
||||
} else {
|
||||
Preconditions.checkState(false, "invalid mmapData type " +
|
||||
Preconditions.checkState(false, "invalid mmapData type %s",
|
||||
replica.mmapData.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
@ -243,19 +243,23 @@ void close() {
|
||||
String suffix = "";
|
||||
|
||||
Preconditions.checkState(refCount == 0,
|
||||
"tried to close replica with refCount " + refCount + ": " + this);
|
||||
"tried to close replica with refCount %d: %s", refCount, this);
|
||||
refCount = -1;
|
||||
Preconditions.checkState(purged,
|
||||
"tried to close unpurged replica " + this);
|
||||
"tried to close unpurged replica %s", this);
|
||||
if (hasMmap()) {
|
||||
munmap();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
suffix += " munmapped.";
|
||||
}
|
||||
}
|
||||
IOUtils.cleanup(LOG, dataStream, metaStream);
|
||||
if (slot != null) {
|
||||
cache.scheduleSlotReleaser(slot);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
suffix += " scheduling " + slot + " for later release.";
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("closed " + this + suffix);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user