HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527)
(cherry picked from commit 94b884ae55
)
This commit is contained in:
parent
728ed10a7c
commit
bd13d73334
@ -69,6 +69,11 @@ public class ClientContext {
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* The client conf used to initialize context.
|
||||
*/
|
||||
private final DfsClientConf dfsClientConf;
|
||||
|
||||
/**
|
||||
* String representation of the configuration.
|
||||
*/
|
||||
@ -130,6 +135,17 @@ public class ClientContext {
|
||||
*/
|
||||
private volatile DeadNodeDetector deadNodeDetector = null;
|
||||
|
||||
/**
|
||||
* The switch for the {@link LocatedBlocksRefresher}.
|
||||
*/
|
||||
private final boolean locatedBlocksRefresherEnabled;
|
||||
|
||||
/**
|
||||
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
|
||||
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
|
||||
*/
|
||||
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;
|
||||
|
||||
/**
|
||||
* Count the reference of ClientContext.
|
||||
*/
|
||||
@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
|
||||
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
||||
|
||||
this.name = name;
|
||||
this.dfsClientConf = conf;
|
||||
this.confString = scConf.confAsString();
|
||||
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
|
||||
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
|
||||
@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
|
||||
this.byteArrayManager = ByteArrayManager.newInstance(
|
||||
conf.getWriteByteArrayManagerConf());
|
||||
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
|
||||
this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
|
||||
initTopologyResolution(config);
|
||||
}
|
||||
|
||||
@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
|
||||
return deadNodeDetector;
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
|
||||
* of registered DFSInputStreams.
|
||||
*/
|
||||
public boolean isLocatedBlocksRefresherEnabled() {
|
||||
return locatedBlocksRefresherEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain LocatedBlocksRefresher of the current client.
|
||||
*/
|
||||
public LocatedBlocksRefresher getLocatedBlocksRefresher() {
|
||||
return locatedBlocksRefresher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the counter. Start the dead node detector thread if there is no
|
||||
* reference.
|
||||
@ -311,6 +344,10 @@ synchronized void reference() {
|
||||
deadNodeDetector = new DeadNodeDetector(name, configuration);
|
||||
deadNodeDetector.start();
|
||||
}
|
||||
if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
|
||||
locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
|
||||
locatedBlocksRefresher.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -324,5 +361,10 @@ synchronized void unreference() {
|
||||
deadNodeDetector.shutdown();
|
||||
deadNodeDetector = null;
|
||||
}
|
||||
|
||||
if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
|
||||
locatedBlocksRefresher.shutdown();
|
||||
locatedBlocksRefresher = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -870,7 +870,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
|
||||
}
|
||||
|
||||
public long getRefreshReadBlkLocationsInterval() {
|
||||
return dfsClientConf.getRefreshReadBlockLocationsMS();
|
||||
return dfsClientConf.getLocatedBlocksRefresherInterval();
|
||||
}
|
||||
|
||||
public LocatedBlocks getLocatedBlocks(String src, long start)
|
||||
@ -3402,4 +3402,36 @@ private boolean isDeadNodeDetectionEnabled() {
|
||||
public DeadNodeDetector getDeadNodeDetector() {
|
||||
return clientContext.getDeadNodeDetector();
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain LocatedBlocksRefresher of the current client.
|
||||
*/
|
||||
public LocatedBlocksRefresher getLocatedBlockRefresher() {
|
||||
return clientContext.getLocatedBlocksRefresher();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that
|
||||
* the underlying {@link LocatedBlocks} is periodically refreshed.
|
||||
*/
|
||||
public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
|
||||
if (isLocatedBlocksRefresherEnabled()) {
|
||||
clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that
|
||||
* the underlying {@link LocatedBlocks} is no longer periodically refreshed.
|
||||
* @param dfsInputStream
|
||||
*/
|
||||
public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
|
||||
if (isLocatedBlocksRefresherEnabled()) {
|
||||
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isLocatedBlocksRefresherEnabled() {
|
||||
return clientContext.isLocatedBlocksRefresherEnabled();
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -66,6 +67,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
@ -128,18 +130,18 @@ public class DFSInputStream extends FSInputStream
|
||||
private long lastBlockBeingWrittenLength = 0;
|
||||
private FileEncryptionInfo fileEncryptionInfo = null;
|
||||
protected CachingStrategy cachingStrategy;
|
||||
// this is volatile because it will be polled outside the lock,
|
||||
// but still only updated within the lock
|
||||
private volatile long lastRefreshedBlocksAt = Time.monotonicNow();
|
||||
////
|
||||
|
||||
private AtomicBoolean refreshingBlockLocations = new AtomicBoolean(false);
|
||||
protected final ReadStatistics readStatistics = new ReadStatistics();
|
||||
// lock for state shared between read and pread
|
||||
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
|
||||
// (it's OK to acquire this lock when the lock on <this> is held)
|
||||
protected final Object infoLock = new Object();
|
||||
|
||||
// refresh locatedBlocks periodically
|
||||
private final long refreshReadBlockIntervals;
|
||||
/** timeStamp of the last time a block location was refreshed. */
|
||||
private long locatedBlocksTimeStamp;
|
||||
/**
|
||||
* Track the ByteBuffers that we have handed out to readers.
|
||||
*
|
||||
@ -156,10 +158,6 @@ public class DFSInputStream extends FSInputStream
|
||||
return extendedReadBuffers;
|
||||
}
|
||||
|
||||
private boolean isPeriodicRefreshEnabled() {
|
||||
return (refreshReadBlockIntervals > 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* This variable tracks the number of failures since the start of the
|
||||
* most recent user-facing operation. That is to say, it should be reset
|
||||
@ -206,9 +204,6 @@ protected DFSClient getDFSClient() {
|
||||
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
|
||||
LocatedBlocks locatedBlocks) throws IOException {
|
||||
this.dfsClient = dfsClient;
|
||||
this.refreshReadBlockIntervals =
|
||||
this.dfsClient.getRefreshReadBlkLocationsInterval();
|
||||
setLocatedBlocksTimeStamp();
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
this.src = src;
|
||||
synchronized (infoLock) {
|
||||
@ -228,19 +223,6 @@ boolean deadNodesContain(DatanodeInfo nodeInfo) {
|
||||
return deadNodes.containsKey(nodeInfo);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setReadTimeStampsForTesting(long timeStamp) {
|
||||
setLocatedBlocksTimeStamp(timeStamp);
|
||||
}
|
||||
|
||||
private void setLocatedBlocksTimeStamp() {
|
||||
setLocatedBlocksTimeStamp(Time.monotonicNow());
|
||||
}
|
||||
|
||||
private void setLocatedBlocksTimeStamp(long timeStamp) {
|
||||
this.locatedBlocksTimeStamp = timeStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Grab the open-file info from namenode
|
||||
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
|
||||
@ -248,33 +230,50 @@ private void setLocatedBlocksTimeStamp(long timeStamp) {
|
||||
void openInfo(boolean refreshLocatedBlocks) throws IOException {
|
||||
final DfsClientConf conf = dfsClient.getConf();
|
||||
synchronized(infoLock) {
|
||||
lastBlockBeingWrittenLength =
|
||||
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
|
||||
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
|
||||
while (retriesForLastBlockLength > 0) {
|
||||
|
||||
while (true) {
|
||||
LocatedBlocks newLocatedBlocks;
|
||||
if (locatedBlocks == null || refreshLocatedBlocks) {
|
||||
newLocatedBlocks = fetchAndCheckLocatedBlocks(locatedBlocks);
|
||||
} else {
|
||||
newLocatedBlocks = locatedBlocks;
|
||||
}
|
||||
|
||||
long lastBlockLength = getLastBlockLength(newLocatedBlocks);
|
||||
if (lastBlockLength != -1) {
|
||||
setLocatedBlocksFields(newLocatedBlocks, lastBlockLength);
|
||||
return;
|
||||
}
|
||||
|
||||
// Getting last block length as -1 is a special case. When cluster
|
||||
// restarts, DNs may not report immediately. At this time partial block
|
||||
// locations will not be available with NN for getting the length. Lets
|
||||
// retry for 3 times to get the length.
|
||||
if (lastBlockBeingWrittenLength == -1) {
|
||||
DFSClient.LOG.warn("Last block locations not available. "
|
||||
+ "Datanodes might not have reported blocks completely."
|
||||
+ " Will retry for " + retriesForLastBlockLength + " times");
|
||||
waitFor(conf.getRetryIntervalForGetLastBlockLength());
|
||||
lastBlockBeingWrittenLength =
|
||||
fetchLocatedBlocksAndGetLastBlockLength(true);
|
||||
} else {
|
||||
break;
|
||||
|
||||
if (retriesForLastBlockLength-- <= 0) {
|
||||
throw new IOException("Could not obtain the last block locations.");
|
||||
}
|
||||
retriesForLastBlockLength--;
|
||||
}
|
||||
if (lastBlockBeingWrittenLength == -1
|
||||
&& retriesForLastBlockLength == 0) {
|
||||
throw new IOException("Could not obtain the last block locations.");
|
||||
|
||||
DFSClient.LOG.warn("Last block locations not available. "
|
||||
+ "Datanodes might not have reported blocks completely."
|
||||
+ " Will retry for " + retriesForLastBlockLength + " times");
|
||||
waitFor(conf.getRetryIntervalForGetLastBlockLength());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set locatedBlocks and related fields, using the passed lastBlockLength.
|
||||
* Should be called within infoLock.
|
||||
*/
|
||||
private void setLocatedBlocksFields(LocatedBlocks locatedBlocksToSet, long lastBlockLength) {
|
||||
locatedBlocks = locatedBlocksToSet;
|
||||
lastBlockBeingWrittenLength = lastBlockLength;
|
||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||
setLastRefreshedBlocksAt();
|
||||
}
|
||||
|
||||
private void waitFor(int waitTime) throws IOException {
|
||||
try {
|
||||
Thread.sleep(waitTime);
|
||||
@ -285,62 +284,18 @@ private void waitFor(int waitTime) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the block locations timestamps have expired.
|
||||
* In the case of expired timestamp:
|
||||
* - clear list of deadNodes
|
||||
* - call openInfo(true) which will re-fetch locatedblocks
|
||||
* - update locatedBlocksTimeStamp
|
||||
* @return true when the expiration feature is enabled and locatedblocks
|
||||
* timestamp has expired.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean isLocatedBlocksExpired() {
|
||||
if (!isPeriodicRefreshEnabled()) {
|
||||
return false;
|
||||
}
|
||||
long now = Time.monotonicNow();
|
||||
long elapsed = now - locatedBlocksTimeStamp;
|
||||
if (elapsed < refreshReadBlockIntervals) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the block locations timestamps if they have expired.
|
||||
* In the case of expired timestamp:
|
||||
* - clear list of deadNodes
|
||||
* - call openInfo(true) which will re-fetch locatedblocks
|
||||
* - update locatedBlocksTimeStamp
|
||||
* @return true when the locatedblocks list is re-fetched from the namenode.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean updateBlockLocationsStamp() throws IOException {
|
||||
if (!isLocatedBlocksExpired()) {
|
||||
return false;
|
||||
}
|
||||
// clear dead nodes
|
||||
deadNodes.clear();
|
||||
openInfo(true);
|
||||
setLocatedBlocksTimeStamp();
|
||||
return true;
|
||||
}
|
||||
|
||||
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
|
||||
private LocatedBlocks fetchAndCheckLocatedBlocks(LocatedBlocks existing)
|
||||
throws IOException {
|
||||
LocatedBlocks newInfo = locatedBlocks;
|
||||
if (locatedBlocks == null || refresh) {
|
||||
newInfo = dfsClient.getLocatedBlocks(src, 0);
|
||||
}
|
||||
LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
|
||||
|
||||
DFSClient.LOG.debug("newInfo = {}", newInfo);
|
||||
if (newInfo == null) {
|
||||
throw new IOException("Cannot open filename " + src);
|
||||
}
|
||||
|
||||
if (locatedBlocks != null) {
|
||||
if (existing != null) {
|
||||
Iterator<LocatedBlock> oldIter =
|
||||
locatedBlocks.getLocatedBlocks().iterator();
|
||||
existing.getLocatedBlocks().iterator();
|
||||
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
||||
while (oldIter.hasNext() && newIter.hasNext()) {
|
||||
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
||||
@ -348,17 +303,14 @@ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
|
||||
}
|
||||
}
|
||||
}
|
||||
locatedBlocks = newInfo;
|
||||
long lastBlkBeingWrittenLength = getLastBlockLength();
|
||||
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
|
||||
|
||||
return lastBlkBeingWrittenLength;
|
||||
return newInfo;
|
||||
}
|
||||
|
||||
private long getLastBlockLength() throws IOException{
|
||||
private long getLastBlockLength(LocatedBlocks blocks) throws IOException{
|
||||
long lastBlockBeingWrittenLength = 0;
|
||||
if (!locatedBlocks.isLastBlockComplete()) {
|
||||
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
|
||||
if (!blocks.isLastBlockComplete()) {
|
||||
final LocatedBlock last = blocks.getLastLocatedBlock();
|
||||
if (last != null) {
|
||||
if (last.getLocations().length == 0) {
|
||||
if (last.getBlockSize() == 0) {
|
||||
@ -501,6 +453,14 @@ public List<LocatedBlock> getAllBlocks() throws IOException {
|
||||
return getBlockRange(0, getFileLength());
|
||||
}
|
||||
|
||||
protected String getSrc() {
|
||||
return src;
|
||||
}
|
||||
|
||||
protected LocatedBlocks getLocatedBlocks() {
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get block at the specified position.
|
||||
* Fetch it from the namenode if not cached.
|
||||
@ -543,8 +503,8 @@ protected LocatedBlock fetchBlockAt(long offset) throws IOException {
|
||||
/** Fetch a block from namenode and cache it */
|
||||
private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
|
||||
throws IOException {
|
||||
maybeRegisterBlockRefresh();
|
||||
synchronized(infoLock) {
|
||||
updateBlockLocationsStamp();
|
||||
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
||||
if (targetBlockIdx < 0) { // block is not cached
|
||||
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
||||
@ -559,8 +519,7 @@ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
|
||||
}
|
||||
// Update the LastLocatedBlock, if offset is for last block.
|
||||
if (offset >= locatedBlocks.getFileLength()) {
|
||||
locatedBlocks = newBlocks;
|
||||
lastBlockBeingWrittenLength = getLastBlockLength();
|
||||
setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
|
||||
} else {
|
||||
locatedBlocks.insertRange(targetBlockIdx,
|
||||
newBlocks.getLocatedBlocks());
|
||||
@ -587,6 +546,7 @@ private List<LocatedBlock> getBlockRange(long offset,
|
||||
throw new IOException("Offset: " + offset +
|
||||
" exceeds file length: " + getFileLength());
|
||||
}
|
||||
|
||||
synchronized(infoLock) {
|
||||
final List<LocatedBlock> blocks;
|
||||
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
|
||||
@ -644,6 +604,9 @@ private synchronized DatanodeInfo blockSeekTo(long target)
|
||||
if (target >= getFileLength()) {
|
||||
throw new IOException("Attempted to read past end of file");
|
||||
}
|
||||
|
||||
maybeRegisterBlockRefresh();
|
||||
|
||||
// Will be getting a new BlockReader.
|
||||
closeCurrentBlockReaders();
|
||||
|
||||
@ -657,9 +620,6 @@ private synchronized DatanodeInfo blockSeekTo(long target)
|
||||
boolean connectFailedOnce = false;
|
||||
|
||||
while (true) {
|
||||
// Re-fetch the locatedBlocks from NN if the timestamp has expired.
|
||||
updateBlockLocationsStamp();
|
||||
|
||||
//
|
||||
// Compute desired block
|
||||
//
|
||||
@ -793,6 +753,7 @@ public void accept(ByteBuffer k, Object v) {
|
||||
* this dfsInputStream anymore.
|
||||
*/
|
||||
dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
|
||||
maybeDeRegisterBlockRefresh();
|
||||
}
|
||||
}
|
||||
|
||||
@ -871,16 +832,16 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
|
||||
int len = strategy.getTargetLength();
|
||||
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
|
||||
failures = 0;
|
||||
|
||||
maybeRegisterBlockRefresh();
|
||||
|
||||
if (pos < getFileLength()) {
|
||||
int retries = 2;
|
||||
while (retries > 0) {
|
||||
try {
|
||||
// currentNode can be left as null if previous read had a checksum
|
||||
// error on the same block. See HDFS-3067
|
||||
// currentNode needs to be updated if the blockLocations timestamp has
|
||||
// expired.
|
||||
if (pos > blockEnd || currentNode == null
|
||||
|| updateBlockLocationsStamp()) {
|
||||
if (pos > blockEnd || currentNode == null) {
|
||||
currentNode = blockSeekTo(pos);
|
||||
}
|
||||
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
||||
@ -1958,4 +1919,153 @@ public boolean hasCapability(String capability) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Many DFSInputStreams can be opened and closed in quick succession, in which case
|
||||
* they would be registered/deregistered but never need to be refreshed.
|
||||
* Defers registering with the located block refresher, in order to avoid an additional
|
||||
* source of unnecessary synchronization for short-lived DFSInputStreams.
|
||||
*/
|
||||
protected void maybeRegisterBlockRefresh() {
|
||||
if (!dfsClient.getConf().isRefreshReadBlockLocationsAutomatically()
|
||||
|| !dfsClient.getConf().isLocatedBlocksRefresherEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (refreshingBlockLocations.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// not enough time elapsed to refresh
|
||||
long timeSinceLastRefresh = Time.monotonicNow() - lastRefreshedBlocksAt;
|
||||
if (timeSinceLastRefresh < dfsClient.getConf().getLocatedBlocksRefresherInterval()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!refreshingBlockLocations.getAndSet(true)) {
|
||||
dfsClient.addLocatedBlocksRefresh(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* De-register periodic refresh of this inputstream, if it was added to begin with.
|
||||
*/
|
||||
private void maybeDeRegisterBlockRefresh() {
|
||||
if (refreshingBlockLocations.get()) {
|
||||
dfsClient.removeLocatedBlocksRefresh(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh blocks for the input stream, if necessary.
|
||||
*
|
||||
* @param addressCache optional map to use as a cache for resolving datanode InetSocketAddress
|
||||
* @return whether a refresh was performed or not
|
||||
*/
|
||||
boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) {
|
||||
LocatedBlocks blocks;
|
||||
synchronized (infoLock) {
|
||||
blocks = getLocatedBlocks();
|
||||
}
|
||||
|
||||
if (getLocalDeadNodes().isEmpty() && allBlocksLocal(blocks, addressCache)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
DFSClient.LOG.debug("Refreshing {} for path {}", this, getSrc());
|
||||
LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(blocks);
|
||||
long lastBlockLength = getLastBlockLength(newLocatedBlocks);
|
||||
if (lastBlockLength == -1) {
|
||||
DFSClient.LOG.debug(
|
||||
"Discarding refreshed blocks for path {} because lastBlockLength was -1",
|
||||
getSrc());
|
||||
return true;
|
||||
}
|
||||
|
||||
setRefreshedValues(newLocatedBlocks, lastBlockLength);
|
||||
} catch (IOException e) {
|
||||
DFSClient.LOG.debug("Failed to refresh DFSInputStream for path {}", getSrc(), e);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Once new LocatedBlocks have been fetched, sets them on the DFSInputStream and
|
||||
* updates stateful read location within the necessary locks.
|
||||
*/
|
||||
private synchronized void setRefreshedValues(LocatedBlocks blocks, long lastBlockLength)
|
||||
throws IOException {
|
||||
synchronized (infoLock) {
|
||||
setLocatedBlocksFields(blocks, lastBlockLength);
|
||||
}
|
||||
|
||||
getLocalDeadNodes().clear();
|
||||
|
||||
// if a stateful read has been initialized, refresh it
|
||||
if (currentNode != null) {
|
||||
currentNode = blockSeekTo(pos);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean allBlocksLocal(LocatedBlocks blocks,
|
||||
Map<String, InetSocketAddress> addressCache) {
|
||||
if (addressCache == null) {
|
||||
addressCache = new HashMap<>();
|
||||
}
|
||||
|
||||
// we only need to check the first location of each block, because the blocks are already
|
||||
// sorted by distance from the current host
|
||||
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
|
||||
if (lb.getLocations().length == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DatanodeInfoWithStorage location = lb.getLocations()[0];
|
||||
if (location == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
InetSocketAddress targetAddr = addressCache.computeIfAbsent(
|
||||
location.getDatanodeUuid(),
|
||||
unused -> {
|
||||
String dnAddr = location.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
||||
return NetUtils.createSocketAddr(
|
||||
dnAddr,
|
||||
-1,
|
||||
null,
|
||||
dfsClient.getConf().isUriCacheEnabled());
|
||||
});
|
||||
|
||||
if (!isResolveableAndLocal(targetAddr)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isResolveableAndLocal(InetSocketAddress targetAddr) {
|
||||
try {
|
||||
return DFSUtilClient.isLocalAddress(targetAddr);
|
||||
} catch (IOException e) {
|
||||
DFSClient.LOG.debug("Got an error checking if {} is local", targetAddr, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setLastRefreshedBlocksAtForTesting(long timestamp) {
|
||||
lastRefreshedBlocksAt = timestamp;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getLastRefreshedBlocksAtForTesting() {
|
||||
return lastRefreshedBlocksAt;
|
||||
}
|
||||
|
||||
private void setLastRefreshedBlocksAt() {
|
||||
lastRefreshedBlocksAt = Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
|
@ -143,14 +143,6 @@ protected ByteBuffer getCurStripeBuf() {
|
||||
return curStripeBuf;
|
||||
}
|
||||
|
||||
protected String getSrc() {
|
||||
return src;
|
||||
}
|
||||
|
||||
protected LocatedBlocks getLocatedBlocks() {
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
protected ByteBufferPool getBufferPool() {
|
||||
return BUFFER_POOL;
|
||||
}
|
||||
@ -168,6 +160,8 @@ synchronized void blockSeekTo(long target) throws IOException {
|
||||
throw new IOException("Attempted to read past end of file");
|
||||
}
|
||||
|
||||
maybeRegisterBlockRefresh();
|
||||
|
||||
// Will be getting a new BlockReader.
|
||||
closeCurrentBlockReaders();
|
||||
|
||||
|
@ -0,0 +1,210 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Periodically refresh the underlying cached {@link LocatedBlocks} for eligible registered
|
||||
* {@link DFSInputStream}s. DFSInputStreams are eligible for refreshing if they have any
|
||||
* deadNodes or any blocks are lacking local replicas.
|
||||
* Disabled by default, unless an interval is configured.
|
||||
*/
|
||||
public class LocatedBlocksRefresher extends Daemon {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(LocatedBlocksRefresher.class);
|
||||
|
||||
private static final String THREAD_PREFIX = "located-block-refresher-";
|
||||
|
||||
private final String name;
|
||||
private final long interval;
|
||||
private final long jitter;
|
||||
private final ExecutorService refreshThreadPool;
|
||||
|
||||
// Use WeakHashMap so that we don't hold onto references that might have not been explicitly
|
||||
// closed because they were created and thrown away.
|
||||
private final Set<DFSInputStream> registeredInputStreams =
|
||||
Collections.newSetFromMap(new WeakHashMap<>());
|
||||
|
||||
private int runCount;
|
||||
private int refreshCount;
|
||||
|
||||
LocatedBlocksRefresher(String name, Configuration conf, DfsClientConf dfsClientConf) {
|
||||
this.name = name;
|
||||
this.interval = dfsClientConf.getLocatedBlocksRefresherInterval();
|
||||
this.jitter = Math.round(this.interval * 0.1);
|
||||
int rpcThreads = conf.getInt(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY,
|
||||
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT);
|
||||
|
||||
String threadPrefix;
|
||||
if (name.equals(DFS_CLIENT_CONTEXT_DEFAULT)) {
|
||||
threadPrefix = THREAD_PREFIX;
|
||||
} else {
|
||||
threadPrefix = THREAD_PREFIX + name + "-";
|
||||
}
|
||||
|
||||
this.refreshThreadPool = Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory() {
|
||||
private final AtomicInteger threadIndex = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = super.newThread(r);
|
||||
t.setName(threadPrefix + threadIndex.getAndIncrement());
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
setName(threadPrefix + "main");
|
||||
|
||||
LOG.info("Start located block refresher for DFSClient {}.", this.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
|
||||
if (!waitForInterval()) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("Running refresh for {} streams", registeredInputStreams.size());
|
||||
long start = Time.monotonicNow();
|
||||
AtomicInteger neededRefresh = new AtomicInteger(0);
|
||||
|
||||
Phaser phaser = new Phaser(1);
|
||||
|
||||
Map<String, InetSocketAddress> addressCache = new ConcurrentHashMap<>();
|
||||
|
||||
for (DFSInputStream inputStream : getInputStreams()) {
|
||||
phaser.register();
|
||||
refreshThreadPool.submit(() -> {
|
||||
try {
|
||||
if (isInputStreamTracked(inputStream) &&
|
||||
inputStream.refreshBlockLocations(addressCache)) {
|
||||
neededRefresh.incrementAndGet();
|
||||
}
|
||||
} finally {
|
||||
phaser.arriveAndDeregister();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
|
||||
synchronized (this) {
|
||||
runCount++;
|
||||
refreshCount += neededRefresh.get();
|
||||
}
|
||||
|
||||
LOG.debug(
|
||||
"Finished refreshing {} of {} streams in {}ms",
|
||||
neededRefresh,
|
||||
registeredInputStreams.size(),
|
||||
Time.monotonicNow() - start
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized int getRunCount() {
|
||||
return runCount;
|
||||
}
|
||||
|
||||
public synchronized int getRefreshCount() {
|
||||
return refreshCount;
|
||||
}
|
||||
|
||||
private boolean waitForInterval() {
|
||||
try {
|
||||
Thread.sleep(interval + ThreadLocalRandom.current().nextLong(-jitter, jitter));
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted during wait interval", e);
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all the threads.
|
||||
*/
|
||||
public void shutdown() {
|
||||
if (isAlive()) {
|
||||
interrupt();
|
||||
try {
|
||||
join();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
refreshThreadPool.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Collects the DFSInputStreams to a list within synchronization, so that we can iterate them
|
||||
* without potentially blocking callers to {@link #addInputStream(DFSInputStream)} or
|
||||
* {@link #removeInputStream(DFSInputStream)}. We don't care so much about missing additions,
|
||||
* and we'll guard against removals by doing an additional
|
||||
* {@link #isInputStreamTracked(DFSInputStream)} track during iteration.
|
||||
*/
|
||||
private synchronized Collection<DFSInputStream> getInputStreams() {
|
||||
return new ArrayList<>(registeredInputStreams);
|
||||
}
|
||||
|
||||
public synchronized void addInputStream(DFSInputStream dfsInputStream) {
|
||||
LOG.trace("Registering {} for {}", dfsInputStream, dfsInputStream.getSrc());
|
||||
registeredInputStreams.add(dfsInputStream);
|
||||
}
|
||||
|
||||
public synchronized void removeInputStream(DFSInputStream dfsInputStream) {
|
||||
if (isInputStreamTracked(dfsInputStream)) {
|
||||
LOG.trace("De-registering {} for {}", dfsInputStream, dfsInputStream.getSrc());
|
||||
registeredInputStreams.remove(dfsInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean isInputStreamTracked(DFSInputStream dfsInputStream) {
|
||||
return registeredInputStreams.contains(dfsInputStream);
|
||||
}
|
||||
|
||||
public long getInterval() {
|
||||
return interval;
|
||||
}
|
||||
}
|
@ -199,6 +199,19 @@ public interface HdfsClientConfigKeys {
|
||||
"dfs.client.refresh.read-block-locations.ms";
|
||||
long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L;
|
||||
|
||||
// Number of threads to use for refreshing LocatedBlocks of registered
|
||||
// DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
|
||||
// this may help refresh them all in a timely manner.
|
||||
String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY =
|
||||
"dfs.client.refresh.read-block-locations.threads";
|
||||
int DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT = 5;
|
||||
|
||||
// Whether to auto-register all DFSInputStreams for background refreshes.
|
||||
// If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream)
|
||||
String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY =
|
||||
"dfs.client.refresh.read-block-locations.register-automatically";
|
||||
boolean DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT = true;
|
||||
|
||||
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
|
||||
"dfs.datanode.kerberos.principal";
|
||||
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
|
||||
|
@ -144,6 +144,7 @@ public class DfsClientConf {
|
||||
|
||||
/** wait time window before refreshing blocklocation for inputstream. */
|
||||
private final long refreshReadBlockLocationsMS;
|
||||
private final boolean refreshReadBlockLocationsAutomatically;
|
||||
|
||||
private final ShortCircuitConf shortCircuitConf;
|
||||
private final int clientShortCircuitNum;
|
||||
@ -266,6 +267,10 @@ public DfsClientConf(Configuration conf) {
|
||||
HdfsClientConfigKeys.
|
||||
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
|
||||
|
||||
refreshReadBlockLocationsAutomatically = conf.getBoolean(
|
||||
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY,
|
||||
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT);
|
||||
|
||||
hedgedReadThresholdMillis = conf.getLong(
|
||||
HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
||||
@ -696,13 +701,18 @@ public boolean isReadUseCachePriority() {
|
||||
return replicaAccessorBuilderClasses;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the replicaAccessorBuilderClasses
|
||||
*/
|
||||
public long getRefreshReadBlockLocationsMS() {
|
||||
public boolean isLocatedBlocksRefresherEnabled() {
|
||||
return refreshReadBlockLocationsMS > 0;
|
||||
}
|
||||
|
||||
public long getLocatedBlocksRefresherInterval() {
|
||||
return refreshReadBlockLocationsMS;
|
||||
}
|
||||
|
||||
public boolean isRefreshReadBlockLocationsAutomatically() {
|
||||
return refreshReadBlockLocationsAutomatically;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the shortCircuitConf
|
||||
*/
|
||||
|
@ -3260,6 +3260,25 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.refresh.read-block-locations.register-automatically</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Whether to auto-register all DFSInputStreams for background LocatedBlock refreshes.
|
||||
If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream)
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.refresh.read-block-locations.threads</name>
|
||||
<value>5</value>
|
||||
<description>
|
||||
Number of threads to use for refreshing LocatedBlocks of registered
|
||||
DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
|
||||
this may help refresh them all in a timely manner.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.lease-recheck-interval-ms</name>
|
||||
<value>2000</value>
|
||||
|
@ -21,28 +21,25 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -137,154 +134,111 @@ public void teardown() throws IOException {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRead() throws Exception {
|
||||
public void testRefreshBlockLocations() throws IOException {
|
||||
final String fileName = "/test_cache_locations";
|
||||
filePath = new Path(fileName);
|
||||
filePath = createFile(fileName);
|
||||
|
||||
try (DFSInputStream fin = dfsClient.open(fileName)) {
|
||||
LocatedBlocks existing = fin.locatedBlocks;
|
||||
long lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
|
||||
|
||||
assertFalse("should not have attempted refresh",
|
||||
fin.refreshBlockLocations(null));
|
||||
assertEquals("should not have updated lastRefreshedAt",
|
||||
lastRefreshedAt, fin.getLastRefreshedBlocksAtForTesting());
|
||||
assertSame("should not have modified locatedBlocks",
|
||||
existing, fin.locatedBlocks);
|
||||
|
||||
// fake a dead node to force refresh
|
||||
// refreshBlockLocations should return true, indicating we attempted a refresh
|
||||
// nothing should be changed, because locations have not changed
|
||||
fin.addToLocalDeadNodes(dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]);
|
||||
assertTrue("should have attempted refresh",
|
||||
fin.refreshBlockLocations(null));
|
||||
verifyChanged(fin, existing, lastRefreshedAt);
|
||||
|
||||
// reset
|
||||
lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
|
||||
existing = fin.locatedBlocks;
|
||||
|
||||
// It's hard to test explicitly for non-local nodes, but we can fake it
|
||||
// because we also treat unresolved as non-local. Pass in a cache where all the datanodes
|
||||
// are unresolved hosts.
|
||||
Map<String, InetSocketAddress> mockAddressCache = new HashMap<>();
|
||||
InetSocketAddress unresolved = InetSocketAddress.createUnresolved("www.google.com", 80);
|
||||
for (DataNode dataNode : dfsCluster.getDataNodes()) {
|
||||
mockAddressCache.put(dataNode.getDatanodeUuid(), unresolved);
|
||||
}
|
||||
|
||||
assertTrue("should have attempted refresh",
|
||||
fin.refreshBlockLocations(mockAddressCache));
|
||||
verifyChanged(fin, existing, lastRefreshedAt);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyChanged(DFSInputStream fin, LocatedBlocks existing, long lastRefreshedAt) {
|
||||
assertTrue("lastRefreshedAt should have incremented",
|
||||
fin.getLastRefreshedBlocksAtForTesting() > lastRefreshedAt);
|
||||
assertNotSame("located blocks should have changed",
|
||||
existing, fin.locatedBlocks);
|
||||
assertTrue("deadNodes should be empty",
|
||||
fin.getLocalDeadNodes().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeferredRegistrationStatefulRead() throws IOException {
|
||||
testWithRegistrationMethod(DFSInputStream::read);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeferredRegistrationPositionalRead() throws IOException {
|
||||
testWithRegistrationMethod(fin -> fin.readFully(0, new byte[1]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeferredRegistrationGetAllBlocks() throws IOException {
|
||||
testWithRegistrationMethod(DFSInputStream::getAllBlocks);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
interface ThrowingConsumer {
|
||||
void accept(DFSInputStream fin) throws IOException;
|
||||
}
|
||||
|
||||
private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) throws IOException {
|
||||
final String fileName = "/test_cache_locations";
|
||||
filePath = createFile(fileName);
|
||||
|
||||
DFSInputStream fin = null;
|
||||
FSDataOutputStream fout = null;
|
||||
try {
|
||||
// create a file and write for testing
|
||||
fout = fs.create(filePath, REPLICATION_FACTOR);
|
||||
fout.write(new byte[(fileLength)]);
|
||||
// finalize the file by closing the output stream
|
||||
fout.close();
|
||||
fout = null;
|
||||
// get the located blocks
|
||||
LocatedBlocks referenceLocatedBlocks =
|
||||
dfsClient.getLocatedBlocks(fileName, 0, fileLength);
|
||||
assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
|
||||
String poolId = dfsCluster.getNamesystem().getBlockPoolId();
|
||||
fin = dfsClient.open(fileName);
|
||||
// get the located blocks from fin
|
||||
LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
|
||||
assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
|
||||
finLocatedBlocks.locatedBlockCount());
|
||||
final int chunkReadSize = BLOCK_SIZE / 4;
|
||||
byte[] readBuffer = new byte[chunkReadSize];
|
||||
// read the first block
|
||||
DatanodeInfo prevDNInfo = null;
|
||||
DatanodeInfo currDNInfo = null;
|
||||
int bytesRead = 0;
|
||||
int firstBlockMark = BLOCK_SIZE;
|
||||
// get the second block locations
|
||||
LocatedBlock firstLocatedBlk =
|
||||
fin.locatedBlocks.getLocatedBlocks().get(0);
|
||||
DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
|
||||
while (fin.getPos() < firstBlockMark) {
|
||||
bytesRead = fin.read(readBuffer);
|
||||
Assert.assertTrue("Unexpected number of read bytes",
|
||||
chunkReadSize >= bytesRead);
|
||||
if (currDNInfo == null) {
|
||||
currDNInfo = fin.getCurrentDatanode();
|
||||
assertNotNull("current FIS datanode is null", currDNInfo);
|
||||
continue;
|
||||
}
|
||||
prevDNInfo = currDNInfo;
|
||||
currDNInfo = fin.getCurrentDatanode();
|
||||
assertEquals("the DFSInput stream does not read from same node",
|
||||
prevDNInfo, currDNInfo);
|
||||
}
|
||||
assertFalse("should not be tracking input stream on open",
|
||||
dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
|
||||
|
||||
assertEquals("InputStream exceeds expected position",
|
||||
firstBlockMark, fin.getPos());
|
||||
// get the second block locations
|
||||
LocatedBlock secondLocatedBlk =
|
||||
fin.locatedBlocks.getLocatedBlocks().get(1);
|
||||
// get the nodeinfo for that block
|
||||
DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
|
||||
DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
|
||||
// stop the datanode in the list of the
|
||||
DataNode deadNode = getdataNodeFromHostName(dfsCluster,
|
||||
deadNodeInfo.getHostName());
|
||||
// Shutdown and wait for datanode to be marked dead
|
||||
DatanodeRegistration reg = InternalDataNodeTestUtils.
|
||||
getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
|
||||
DataNodeProperties stoppedDNProps =
|
||||
dfsCluster.stopDataNode(deadNodeInfo.getName());
|
||||
// still not registered because it hasn't been an hour by the time we call this
|
||||
registrationMethod.accept(fin);
|
||||
assertFalse("should not be tracking input stream after first read",
|
||||
dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
|
||||
|
||||
List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
|
||||
assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
|
||||
// get the located blocks
|
||||
LocatedBlocks afterStoppageLocatedBlocks =
|
||||
dfsClient.getLocatedBlocks(fileName, 0, fileLength);
|
||||
// read second block
|
||||
int secondBlockMark = (int) (1.5 * BLOCK_SIZE);
|
||||
boolean firstIteration = true;
|
||||
if (this.enableBlkExpiration) {
|
||||
// set the time stamps to make sure that we do not refresh locations yet
|
||||
fin.setReadTimeStampsForTesting(Time.monotonicNow());
|
||||
}
|
||||
while (fin.getPos() < secondBlockMark) {
|
||||
bytesRead = fin.read(readBuffer);
|
||||
assertTrue("dead node used to read at position: " + fin.getPos(),
|
||||
fin.deadNodesContain(deadNodeInfo));
|
||||
Assert.assertTrue("Unexpected number of read bytes",
|
||||
chunkReadSize >= bytesRead);
|
||||
prevDNInfo = currDNInfo;
|
||||
currDNInfo = fin.getCurrentDatanode();
|
||||
assertNotEquals(deadNodeInfo, currDNInfo);
|
||||
if (firstIteration) {
|
||||
// currDNInfo has to be different unless first block locs is different
|
||||
assertFalse("FSInputStream should pick a different DN",
|
||||
firstBlkDNInfos[0].equals(deadNodeInfo)
|
||||
&& prevDNInfo.equals(currDNInfo));
|
||||
firstIteration = false;
|
||||
}
|
||||
}
|
||||
assertEquals("InputStream exceeds expected position",
|
||||
secondBlockMark, fin.getPos());
|
||||
// restart the dead node with the same port
|
||||
assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
|
||||
dfsCluster.waitActive();
|
||||
List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
|
||||
assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
|
||||
// continue reading from block 2 again. We should read from deadNode
|
||||
int thirdBlockMark = 2 * BLOCK_SIZE;
|
||||
firstIteration = true;
|
||||
while (fin.getPos() < thirdBlockMark) {
|
||||
bytesRead = fin.read(readBuffer);
|
||||
if (this.enableBlkExpiration) {
|
||||
assertEquals("node is removed from deadNodes after 1st iteration",
|
||||
firstIteration, fin.deadNodesContain(deadNodeInfo));
|
||||
} else {
|
||||
assertTrue(fin.deadNodesContain(deadNodeInfo));
|
||||
}
|
||||
Assert.assertTrue("Unexpected number of read bytes",
|
||||
chunkReadSize >= bytesRead);
|
||||
prevDNInfo = currDNInfo;
|
||||
currDNInfo = fin.getCurrentDatanode();
|
||||
if (!this.enableBlkExpiration) {
|
||||
assertNotEquals(deadNodeInfo, currDNInfo);
|
||||
}
|
||||
if (firstIteration) {
|
||||
assertEquals(prevDNInfo, currDNInfo);
|
||||
firstIteration = false;
|
||||
if (this.enableBlkExpiration) {
|
||||
// reset the time stamps of located blocks to force cache expiration
|
||||
fin.setReadTimeStampsForTesting(
|
||||
Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals("InputStream exceeds expected position",
|
||||
thirdBlockMark, fin.getPos());
|
||||
// artificially make it have been an hour
|
||||
fin.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
|
||||
registrationMethod.accept(fin);
|
||||
assertEquals("SHOULD be tracking input stream on read after interval, only if enabled",
|
||||
enableBlkExpiration, dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
|
||||
} finally {
|
||||
if (fout != null) {
|
||||
fout.close();
|
||||
}
|
||||
if (fin != null) {
|
||||
fin.close();
|
||||
assertFalse(dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
|
||||
}
|
||||
fs.delete(filePath, true);
|
||||
}
|
||||
}
|
||||
|
||||
private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
|
||||
String hostName) {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
if (dn.getDatanodeId().getHostName().equals(hostName)) {
|
||||
return dn;
|
||||
}
|
||||
private Path createFile(String fileName) throws IOException {
|
||||
Path path = new Path(fileName);
|
||||
try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) {
|
||||
fout.write(new byte[(fileLength)]);
|
||||
}
|
||||
return null;
|
||||
return path;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,266 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestLocatedBlocksRefresher {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);
|
||||
|
||||
private static final int BLOCK_SIZE = 1024 * 1024;
|
||||
private static final short REPLICATION_FACTOR = (short) 4;
|
||||
private static final String[] RACKS = new String[] {
|
||||
"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
|
||||
private static final int NUM_DATA_NODES = RACKS.length;
|
||||
|
||||
private final int numOfBlocks = 24;
|
||||
private final int fileLength = numOfBlocks * BLOCK_SIZE;
|
||||
private final int dfsClientPrefetchSize = fileLength / 2;
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private Configuration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
cluster = null;
|
||||
conf = new HdfsConfiguration();
|
||||
|
||||
// disable shortcircuit reading
|
||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
||||
// set replication factor
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
|
||||
// set block size and other sizes
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
|
||||
dfsClientPrefetchSize);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void setupTest(long refreshInterval) throws IOException {
|
||||
conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, refreshInterval);
|
||||
|
||||
// this is necessary to ensure no caching between runs
|
||||
conf.set("dfs.client.context", UUID.randomUUID().toString());
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabledOnZeroInterval() throws IOException {
|
||||
setupTest(0);
|
||||
assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnabledOnNonZeroInterval() throws Exception {
|
||||
setupTest(1000);
|
||||
LocatedBlocksRefresher refresher =
|
||||
cluster.getFileSystem().getClient().getLocatedBlockRefresher();
|
||||
assertNotNull(refresher);
|
||||
assertNoMoreRefreshes(refresher);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshOnDeadNodes() throws Exception {
|
||||
setupTest(1000);
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DFSClient client = fs.getClient();
|
||||
LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher();
|
||||
|
||||
String fileName = createTestFile(fs);
|
||||
|
||||
try (DFSInputStream fin = client.open(fileName)) {
|
||||
LocatedBlocks locatedBlocks = fin.locatedBlocks;
|
||||
assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
|
||||
locatedBlocks.locatedBlockCount());
|
||||
|
||||
// should not be tracked yet
|
||||
assertFalse(refresher.isInputStreamTracked(fin));
|
||||
|
||||
// track and verify
|
||||
refresher.addInputStream(fin);
|
||||
assertTrue(refresher.isInputStreamTracked(fin));
|
||||
|
||||
// no refreshes yet, as nothing has happened
|
||||
assertNoMoreRefreshes(refresher);
|
||||
synchronized (fin.infoLock) {
|
||||
assertSame(locatedBlocks, fin.locatedBlocks);
|
||||
}
|
||||
|
||||
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1);
|
||||
|
||||
// read blocks, which should trigger dead node for the one we stopped
|
||||
int chunkReadSize = BLOCK_SIZE / 4;
|
||||
byte[] readBuffer = new byte[chunkReadSize];
|
||||
fin.read(0, readBuffer, 0, readBuffer.length);
|
||||
|
||||
assertEquals(1, fin.getLocalDeadNodes().size());
|
||||
|
||||
// we should get a refresh now
|
||||
assertRefreshes(refresher, 1);
|
||||
|
||||
// verify that it actually changed things
|
||||
synchronized (fin.infoLock) {
|
||||
assertNotSame(locatedBlocks, fin.locatedBlocks);
|
||||
assertTrue(fin.getLocalDeadNodes().isEmpty());
|
||||
}
|
||||
|
||||
// no more refreshes because everything is happy again
|
||||
assertNoMoreRefreshes(refresher);
|
||||
|
||||
// stop another node, and try to trigger a new deadNode
|
||||
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2);
|
||||
readBuffer = new byte[chunkReadSize];
|
||||
fin.read(0, readBuffer, 0, readBuffer.length);
|
||||
|
||||
// we should refresh again now, and verify
|
||||
// may actually be more than 1, since the first dead node
|
||||
// may still be listed in the replicas for the bock
|
||||
assertTrue(fin.getLocalDeadNodes().size() > 0);
|
||||
|
||||
assertRefreshes(refresher, 1);
|
||||
|
||||
synchronized (fin.infoLock) {
|
||||
assertNotSame(locatedBlocks, fin.locatedBlocks);
|
||||
assertTrue(fin.getLocalDeadNodes().isEmpty());
|
||||
}
|
||||
|
||||
// de-register, and expect no more refreshes below
|
||||
refresher.removeInputStream(fin);
|
||||
}
|
||||
|
||||
assertNoMoreRefreshes(refresher);
|
||||
}
|
||||
|
||||
private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) {
|
||||
synchronized (fin.infoLock) {
|
||||
int idx = fin.locatedBlocks.findBlock(0);
|
||||
for (int i = 0; i < REPLICATION_FACTOR; i++) {
|
||||
String deadNodeAddr = fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr();
|
||||
|
||||
DataNodeProperties dataNodeProperties = cluster.stopDataNode(deadNodeAddr);
|
||||
if (dataNodeProperties != null) {
|
||||
List<DataNode> datanodesPostStoppage = cluster.getDataNodes();
|
||||
assertEquals(expectedNodes, datanodesPostStoppage.size());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
throw new RuntimeException("Could not find a datanode to stop");
|
||||
}
|
||||
}
|
||||
|
||||
private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws InterruptedException {
|
||||
long interval = refresher.getInterval();
|
||||
int runCount = refresher.getRunCount();
|
||||
int refreshCount = refresher.getRefreshCount();
|
||||
|
||||
LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes",
|
||||
runCount + 3, runCount);
|
||||
// wait for it to run 3 times, with some buffer
|
||||
awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval);
|
||||
|
||||
// it should not have refreshed anything, because no DFSInputStreams registered anymore
|
||||
assertEquals(refreshCount, refresher.getRefreshCount());
|
||||
}
|
||||
|
||||
private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes)
|
||||
throws InterruptedException {
|
||||
int runCount = refresher.getRunCount();
|
||||
int refreshCount = refresher.getRefreshCount();
|
||||
int expectedRuns = 3;
|
||||
|
||||
if (expectedRefreshes < 0) {
|
||||
expectedRefreshes = expectedRuns;
|
||||
}
|
||||
|
||||
LOG.info(
|
||||
"Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
|
||||
runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount
|
||||
);
|
||||
|
||||
// wait for it to run 3 times
|
||||
awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000);
|
||||
|
||||
// the values may not be identical due to any refreshes that occurred before we opened
|
||||
// the DFSInputStream but the difference should be identical since we are refreshing
|
||||
// every time
|
||||
assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount);
|
||||
}
|
||||
|
||||
private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
|
||||
throws InterruptedException {
|
||||
long now = Time.monotonicNow();
|
||||
|
||||
while(!test.get()) {
|
||||
if (Time.monotonicNow() - now > timeoutMillis) {
|
||||
fail("Timed out waiting for true condition");
|
||||
return;
|
||||
}
|
||||
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
private String createTestFile(FileSystem fs) throws IOException {
|
||||
String fileName = "/located_blocks_" + UUID.randomUUID().toString();
|
||||
Path filePath = new Path(fileName);
|
||||
try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
|
||||
fout.write(new byte[(fileLength)]);
|
||||
}
|
||||
fs.deleteOnExit(filePath);
|
||||
|
||||
return fileName;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user