HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527)

This commit is contained in:
Bryan Beaudreault 2022-01-25 06:42:35 -05:00 committed by GitHub
parent 43153e80cb
commit 94b884ae55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 915 additions and 265 deletions

View File

@ -69,6 +69,11 @@ public class ClientContext {
*/
private final String name;
/**
* The client conf used to initialize context.
*/
private final DfsClientConf dfsClientConf;
/**
* String representation of the configuration.
*/
@ -130,6 +135,17 @@ public class ClientContext {
*/
private volatile DeadNodeDetector deadNodeDetector = null;
/**
* The switch for the {@link LocatedBlocksRefresher}.
*/
private final boolean locatedBlocksRefresherEnabled;
/**
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
*/
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;
/**
* Count the reference of ClientContext.
*/
@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
final ShortCircuitConf scConf = conf.getShortCircuitConf();
this.name = name;
this.dfsClientConf = conf;
this.confString = scConf.confAsString();
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
initTopologyResolution(config);
}
@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
return deadNodeDetector;
}
/**
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
* of registered DFSInputStreams.
*/
public boolean isLocatedBlocksRefresherEnabled() {
return locatedBlocksRefresherEnabled;
}
/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlocksRefresher() {
return locatedBlocksRefresher;
}
/**
* Increment the counter. Start the dead node detector thread if there is no
* reference.
@ -311,6 +344,10 @@ synchronized void reference() {
deadNodeDetector = new DeadNodeDetector(name, configuration);
deadNodeDetector.start();
}
if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
locatedBlocksRefresher.start();
}
}
/**
@ -324,5 +361,10 @@ synchronized void unreference() {
deadNodeDetector.shutdown();
deadNodeDetector = null;
}
if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
locatedBlocksRefresher.shutdown();
locatedBlocksRefresher = null;
}
}
}

View File

@ -863,7 +863,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
}
public long getRefreshReadBlkLocationsInterval() {
return dfsClientConf.getRefreshReadBlockLocationsMS();
return dfsClientConf.getLocatedBlocksRefresherInterval();
}
/**
@ -3459,4 +3459,36 @@ private boolean isDeadNodeDetectionEnabled() {
public DeadNodeDetector getDeadNodeDetector() {
return clientContext.getDeadNodeDetector();
}
/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlockRefresher() {
return clientContext.getLocatedBlocksRefresher();
}
/**
* Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is periodically refreshed.
*/
public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
}
}
/**
* Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is no longer periodically refreshed.
* @param dfsInputStream
*/
public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
}
}
private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}
}

View File

@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -65,6 +66,7 @@
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -128,18 +130,18 @@ public class DFSInputStream extends FSInputStream
private long lastBlockBeingWrittenLength = 0;
private FileEncryptionInfo fileEncryptionInfo = null;
protected CachingStrategy cachingStrategy;
// this is volatile because it will be polled outside the lock,
// but still only updated within the lock
private volatile long lastRefreshedBlocksAt = Time.monotonicNow();
////
private AtomicBoolean refreshingBlockLocations = new AtomicBoolean(false);
protected final ReadStatistics readStatistics = new ReadStatistics();
// lock for state shared between read and pread
// Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
// (it's OK to acquire this lock when the lock on <this> is held)
protected final Object infoLock = new Object();
// refresh locatedBlocks periodically
private final long refreshReadBlockIntervals;
/** timeStamp of the last time a block location was refreshed. */
private long locatedBlocksTimeStamp;
/**
* Track the ByteBuffers that we have handed out to readers.
*
@ -156,10 +158,6 @@ public class DFSInputStream extends FSInputStream
return extendedReadBuffers;
}
private boolean isPeriodicRefreshEnabled() {
return (refreshReadBlockIntervals > 0L);
}
/**
* This variable tracks the number of failures since the start of the
* most recent user-facing operation. That is to say, it should be reset
@ -206,9 +204,6 @@ protected DFSClient getDFSClient() {
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient;
this.refreshReadBlockIntervals =
this.dfsClient.getRefreshReadBlkLocationsInterval();
setLocatedBlocksTimeStamp();
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
@ -228,19 +223,6 @@ boolean deadNodesContain(DatanodeInfo nodeInfo) {
return deadNodes.containsKey(nodeInfo);
}
@VisibleForTesting
void setReadTimeStampsForTesting(long timeStamp) {
setLocatedBlocksTimeStamp(timeStamp);
}
private void setLocatedBlocksTimeStamp() {
setLocatedBlocksTimeStamp(Time.monotonicNow());
}
private void setLocatedBlocksTimeStamp(long timeStamp) {
this.locatedBlocksTimeStamp = timeStamp;
}
/**
* Grab the open-file info from namenode
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
@ -248,33 +230,50 @@ private void setLocatedBlocksTimeStamp(long timeStamp) {
void openInfo(boolean refreshLocatedBlocks) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
while (true) {
LocatedBlocks newLocatedBlocks;
if (locatedBlocks == null || refreshLocatedBlocks) {
newLocatedBlocks = fetchAndCheckLocatedBlocks(locatedBlocks);
} else {
newLocatedBlocks = locatedBlocks;
}
long lastBlockLength = getLastBlockLength(newLocatedBlocks);
if (lastBlockLength != -1) {
setLocatedBlocksFields(newLocatedBlocks, lastBlockLength);
return;
}
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
if (retriesForLastBlockLength-- <= 0) {
throw new IOException("Could not obtain the last block locations.");
}
retriesForLastBlockLength--;
}
if (lastBlockBeingWrittenLength == -1
&& retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
}
}
}
/**
* Set locatedBlocks and related fields, using the passed lastBlockLength.
* Should be called within infoLock.
*/
private void setLocatedBlocksFields(LocatedBlocks locatedBlocksToSet, long lastBlockLength) {
locatedBlocks = locatedBlocksToSet;
lastBlockBeingWrittenLength = lastBlockLength;
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
setLastRefreshedBlocksAt();
}
private void waitFor(int waitTime) throws IOException {
try {
Thread.sleep(waitTime);
@ -285,62 +284,18 @@ private void waitFor(int waitTime) throws IOException {
}
}
/**
* Checks whether the block locations timestamps have expired.
* In the case of expired timestamp:
* - clear list of deadNodes
* - call openInfo(true) which will re-fetch locatedblocks
* - update locatedBlocksTimeStamp
* @return true when the expiration feature is enabled and locatedblocks
* timestamp has expired.
* @throws IOException
*/
private boolean isLocatedBlocksExpired() {
if (!isPeriodicRefreshEnabled()) {
return false;
}
long now = Time.monotonicNow();
long elapsed = now - locatedBlocksTimeStamp;
if (elapsed < refreshReadBlockIntervals) {
return false;
}
return true;
}
/**
* Update the block locations timestamps if they have expired.
* In the case of expired timestamp:
* - clear list of deadNodes
* - call openInfo(true) which will re-fetch locatedblocks
* - update locatedBlocksTimeStamp
* @return true when the locatedblocks list is re-fetched from the namenode.
* @throws IOException
*/
private boolean updateBlockLocationsStamp() throws IOException {
if (!isLocatedBlocksExpired()) {
return false;
}
// clear dead nodes
deadNodes.clear();
openInfo(true);
setLocatedBlocksTimeStamp();
return true;
}
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
private LocatedBlocks fetchAndCheckLocatedBlocks(LocatedBlocks existing)
throws IOException {
LocatedBlocks newInfo = locatedBlocks;
if (locatedBlocks == null || refresh) {
newInfo = dfsClient.getLocatedBlocks(src, 0);
}
LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
if (existing != null) {
Iterator<LocatedBlock> oldIter =
locatedBlocks.getLocatedBlocks().iterator();
existing.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
@ -348,17 +303,14 @@ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
}
}
}
locatedBlocks = newInfo;
long lastBlkBeingWrittenLength = getLastBlockLength();
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
return lastBlkBeingWrittenLength;
return newInfo;
}
private long getLastBlockLength() throws IOException{
private long getLastBlockLength(LocatedBlocks blocks) throws IOException{
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (!blocks.isLastBlockComplete()) {
final LocatedBlock last = blocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
@ -501,6 +453,14 @@ public List<LocatedBlock> getAllBlocks() throws IOException {
return getBlockRange(0, getFileLength());
}
protected String getSrc() {
return src;
}
protected LocatedBlocks getLocatedBlocks() {
return locatedBlocks;
}
/**
* Get block at the specified position.
* Fetch it from the namenode if not cached.
@ -543,8 +503,8 @@ protected LocatedBlock fetchBlockAt(long offset) throws IOException {
/** Fetch a block from namenode and cache it */
private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
throws IOException {
maybeRegisterBlockRefresh();
synchronized(infoLock) {
updateBlockLocationsStamp();
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@ -559,8 +519,7 @@ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
}
// Update the LastLocatedBlock, if offset is for last block.
if (offset >= locatedBlocks.getFileLength()) {
locatedBlocks = newBlocks;
lastBlockBeingWrittenLength = getLastBlockLength();
setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
} else {
locatedBlocks.insertRange(targetBlockIdx,
newBlocks.getLocatedBlocks());
@ -587,6 +546,7 @@ private List<LocatedBlock> getBlockRange(long offset,
throw new IOException("Offset: " + offset +
" exceeds file length: " + getFileLength());
}
synchronized(infoLock) {
final List<LocatedBlock> blocks;
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
@ -644,6 +604,9 @@ private synchronized DatanodeInfo blockSeekTo(long target)
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
maybeRegisterBlockRefresh();
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
@ -657,9 +620,6 @@ private synchronized DatanodeInfo blockSeekTo(long target)
boolean connectFailedOnce = false;
while (true) {
// Re-fetch the locatedBlocks from NN if the timestamp has expired.
updateBlockLocationsStamp();
//
// Compute desired block
//
@ -793,6 +753,7 @@ public void accept(ByteBuffer k, Object v) {
* this dfsInputStream anymore.
*/
dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
maybeDeRegisterBlockRefresh();
}
}
@ -871,16 +832,16 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
int len = strategy.getTargetLength();
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
failures = 0;
maybeRegisterBlockRefresh();
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
// currentNode needs to be updated if the blockLocations timestamp has
// expired.
if (pos > blockEnd || currentNode == null
|| updateBlockLocationsStamp()) {
if (pos > blockEnd || currentNode == null) {
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
@ -1958,4 +1919,153 @@ public boolean hasCapability(String capability) {
return false;
}
}
/**
* Many DFSInputStreams can be opened and closed in quick succession, in which case
* they would be registered/deregistered but never need to be refreshed.
* Defers registering with the located block refresher, in order to avoid an additional
* source of unnecessary synchronization for short-lived DFSInputStreams.
*/
protected void maybeRegisterBlockRefresh() {
if (!dfsClient.getConf().isRefreshReadBlockLocationsAutomatically()
|| !dfsClient.getConf().isLocatedBlocksRefresherEnabled()) {
return;
}
if (refreshingBlockLocations.get()) {
return;
}
// not enough time elapsed to refresh
long timeSinceLastRefresh = Time.monotonicNow() - lastRefreshedBlocksAt;
if (timeSinceLastRefresh < dfsClient.getConf().getLocatedBlocksRefresherInterval()) {
return;
}
if (!refreshingBlockLocations.getAndSet(true)) {
dfsClient.addLocatedBlocksRefresh(this);
}
}
/**
* De-register periodic refresh of this inputstream, if it was added to begin with.
*/
private void maybeDeRegisterBlockRefresh() {
if (refreshingBlockLocations.get()) {
dfsClient.removeLocatedBlocksRefresh(this);
}
}
/**
* Refresh blocks for the input stream, if necessary.
*
* @param addressCache optional map to use as a cache for resolving datanode InetSocketAddress
* @return whether a refresh was performed or not
*/
boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) {
LocatedBlocks blocks;
synchronized (infoLock) {
blocks = getLocatedBlocks();
}
if (getLocalDeadNodes().isEmpty() && allBlocksLocal(blocks, addressCache)) {
return false;
}
try {
DFSClient.LOG.debug("Refreshing {} for path {}", this, getSrc());
LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(blocks);
long lastBlockLength = getLastBlockLength(newLocatedBlocks);
if (lastBlockLength == -1) {
DFSClient.LOG.debug(
"Discarding refreshed blocks for path {} because lastBlockLength was -1",
getSrc());
return true;
}
setRefreshedValues(newLocatedBlocks, lastBlockLength);
} catch (IOException e) {
DFSClient.LOG.debug("Failed to refresh DFSInputStream for path {}", getSrc(), e);
}
return true;
}
/**
* Once new LocatedBlocks have been fetched, sets them on the DFSInputStream and
* updates stateful read location within the necessary locks.
*/
private synchronized void setRefreshedValues(LocatedBlocks blocks, long lastBlockLength)
throws IOException {
synchronized (infoLock) {
setLocatedBlocksFields(blocks, lastBlockLength);
}
getLocalDeadNodes().clear();
// if a stateful read has been initialized, refresh it
if (currentNode != null) {
currentNode = blockSeekTo(pos);
}
}
private boolean allBlocksLocal(LocatedBlocks blocks,
Map<String, InetSocketAddress> addressCache) {
if (addressCache == null) {
addressCache = new HashMap<>();
}
// we only need to check the first location of each block, because the blocks are already
// sorted by distance from the current host
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
if (lb.getLocations().length == 0) {
return false;
}
DatanodeInfoWithStorage location = lb.getLocations()[0];
if (location == null) {
return false;
}
InetSocketAddress targetAddr = addressCache.computeIfAbsent(
location.getDatanodeUuid(),
unused -> {
String dnAddr = location.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
return NetUtils.createSocketAddr(
dnAddr,
-1,
null,
dfsClient.getConf().isUriCacheEnabled());
});
if (!isResolveableAndLocal(targetAddr)) {
return false;
}
}
return true;
}
private boolean isResolveableAndLocal(InetSocketAddress targetAddr) {
try {
return DFSUtilClient.isLocalAddress(targetAddr);
} catch (IOException e) {
DFSClient.LOG.debug("Got an error checking if {} is local", targetAddr, e);
return false;
}
}
@VisibleForTesting
void setLastRefreshedBlocksAtForTesting(long timestamp) {
lastRefreshedBlocksAt = timestamp;
}
@VisibleForTesting
long getLastRefreshedBlocksAtForTesting() {
return lastRefreshedBlocksAt;
}
private void setLastRefreshedBlocksAt() {
lastRefreshedBlocksAt = Time.monotonicNow();
}
}

View File

@ -141,14 +141,6 @@ protected ByteBuffer getCurStripeBuf() {
return curStripeBuf;
}
protected String getSrc() {
return src;
}
protected LocatedBlocks getLocatedBlocks() {
return locatedBlocks;
}
protected ByteBufferPool getBufferPool() {
return BUFFER_POOL;
}
@ -166,6 +158,8 @@ synchronized void blockSeekTo(long target) throws IOException {
throw new IOException("Attempted to read past end of file");
}
maybeRegisterBlockRefresh();
// Will be getting a new BlockReader.
closeCurrentBlockReaders();

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Periodically refresh the underlying cached {@link LocatedBlocks} for eligible registered
* {@link DFSInputStream}s. DFSInputStreams are eligible for refreshing if they have any
* deadNodes or any blocks are lacking local replicas.
* Disabled by default, unless an interval is configured.
*/
public class LocatedBlocksRefresher extends Daemon {
private static final Logger LOG =
LoggerFactory.getLogger(LocatedBlocksRefresher.class);
private static final String THREAD_PREFIX = "located-block-refresher-";
private final String name;
private final long interval;
private final long jitter;
private final ExecutorService refreshThreadPool;
// Use WeakHashMap so that we don't hold onto references that might have not been explicitly
// closed because they were created and thrown away.
private final Set<DFSInputStream> registeredInputStreams =
Collections.newSetFromMap(new WeakHashMap<>());
private int runCount;
private int refreshCount;
LocatedBlocksRefresher(String name, Configuration conf, DfsClientConf dfsClientConf) {
this.name = name;
this.interval = dfsClientConf.getLocatedBlocksRefresherInterval();
this.jitter = Math.round(this.interval * 0.1);
int rpcThreads = conf.getInt(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY,
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT);
String threadPrefix;
if (name.equals(DFS_CLIENT_CONTEXT_DEFAULT)) {
threadPrefix = THREAD_PREFIX;
} else {
threadPrefix = THREAD_PREFIX + name + "-";
}
this.refreshThreadPool = Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName(threadPrefix + threadIndex.getAndIncrement());
return t;
}
});
setName(threadPrefix + "main");
LOG.info("Start located block refresher for DFSClient {}.", this.name);
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
if (!waitForInterval()) {
return;
}
LOG.debug("Running refresh for {} streams", registeredInputStreams.size());
long start = Time.monotonicNow();
AtomicInteger neededRefresh = new AtomicInteger(0);
Phaser phaser = new Phaser(1);
Map<String, InetSocketAddress> addressCache = new ConcurrentHashMap<>();
for (DFSInputStream inputStream : getInputStreams()) {
phaser.register();
refreshThreadPool.submit(() -> {
try {
if (isInputStreamTracked(inputStream) &&
inputStream.refreshBlockLocations(addressCache)) {
neededRefresh.incrementAndGet();
}
} finally {
phaser.arriveAndDeregister();
}
});
}
phaser.arriveAndAwaitAdvance();
synchronized (this) {
runCount++;
refreshCount += neededRefresh.get();
}
LOG.debug(
"Finished refreshing {} of {} streams in {}ms",
neededRefresh,
registeredInputStreams.size(),
Time.monotonicNow() - start
);
}
}
public synchronized int getRunCount() {
return runCount;
}
public synchronized int getRefreshCount() {
return refreshCount;
}
private boolean waitForInterval() {
try {
Thread.sleep(interval + ThreadLocalRandom.current().nextLong(-jitter, jitter));
return true;
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait interval", e);
Thread.currentThread().interrupt();
return false;
}
}
/**
* Shutdown all the threads.
*/
public void shutdown() {
if (isAlive()) {
interrupt();
try {
join();
} catch (InterruptedException e) {
}
}
refreshThreadPool.shutdown();
}
/**
* Collects the DFSInputStreams to a list within synchronization, so that we can iterate them
* without potentially blocking callers to {@link #addInputStream(DFSInputStream)} or
* {@link #removeInputStream(DFSInputStream)}. We don't care so much about missing additions,
* and we'll guard against removals by doing an additional
* {@link #isInputStreamTracked(DFSInputStream)} track during iteration.
*/
private synchronized Collection<DFSInputStream> getInputStreams() {
return new ArrayList<>(registeredInputStreams);
}
public synchronized void addInputStream(DFSInputStream dfsInputStream) {
LOG.trace("Registering {} for {}", dfsInputStream, dfsInputStream.getSrc());
registeredInputStreams.add(dfsInputStream);
}
public synchronized void removeInputStream(DFSInputStream dfsInputStream) {
if (isInputStreamTracked(dfsInputStream)) {
LOG.trace("De-registering {} for {}", dfsInputStream, dfsInputStream.getSrc());
registeredInputStreams.remove(dfsInputStream);
}
}
public synchronized boolean isInputStreamTracked(DFSInputStream dfsInputStream) {
return registeredInputStreams.contains(dfsInputStream);
}
public long getInterval() {
return interval;
}
}

View File

@ -205,6 +205,19 @@ public interface HdfsClientConfigKeys {
"dfs.client.refresh.read-block-locations.ms";
long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L;
// Number of threads to use for refreshing LocatedBlocks of registered
// DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
// this may help refresh them all in a timely manner.
String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY =
"dfs.client.refresh.read-block-locations.threads";
int DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT = 5;
// Whether to auto-register all DFSInputStreams for background refreshes.
// If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream)
String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY =
"dfs.client.refresh.read-block-locations.register-automatically";
boolean DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT = true;
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";

View File

@ -148,6 +148,7 @@ public class DfsClientConf {
/** wait time window before refreshing blocklocation for inputstream. */
private final long refreshReadBlockLocationsMS;
private final boolean refreshReadBlockLocationsAutomatically;
private final ShortCircuitConf shortCircuitConf;
private final int clientShortCircuitNum;
@ -273,6 +274,10 @@ public DfsClientConf(Configuration conf) {
HdfsClientConfigKeys.
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
refreshReadBlockLocationsAutomatically = conf.getBoolean(
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY,
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT);
hedgedReadThresholdMillis = conf.getLong(
HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@ -714,13 +719,18 @@ public boolean isReadUseCachePriority() {
return replicaAccessorBuilderClasses;
}
/**
* @return the replicaAccessorBuilderClasses
*/
public long getRefreshReadBlockLocationsMS() {
public boolean isLocatedBlocksRefresherEnabled() {
return refreshReadBlockLocationsMS > 0;
}
public long getLocatedBlocksRefresherInterval() {
return refreshReadBlockLocationsMS;
}
public boolean isRefreshReadBlockLocationsAutomatically() {
return refreshReadBlockLocationsAutomatically;
}
/**
* @return the shortCircuitConf
*/

View File

@ -3367,6 +3367,25 @@
</description>
</property>
<property>
<name>dfs.client.refresh.read-block-locations.register-automatically</name>
<value>true</value>
<description>
Whether to auto-register all DFSInputStreams for background LocatedBlock refreshes.
If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream)
</description>
</property>
<property>
<name>dfs.client.refresh.read-block-locations.threads</name>
<value>5</value>
<description>
Number of threads to use for refreshing LocatedBlocks of registered
DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
this may help refresh them all in a timely manner.
</description>
</property>
<property>
<name>dfs.namenode.lease-recheck-interval-ms</name>
<value>2000</value>

View File

@ -21,28 +21,25 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -137,154 +134,111 @@ public void teardown() throws IOException {
}
@Test
public void testRead() throws Exception {
public void testRefreshBlockLocations() throws IOException {
final String fileName = "/test_cache_locations";
filePath = new Path(fileName);
filePath = createFile(fileName);
try (DFSInputStream fin = dfsClient.open(fileName)) {
LocatedBlocks existing = fin.locatedBlocks;
long lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
assertFalse("should not have attempted refresh",
fin.refreshBlockLocations(null));
assertEquals("should not have updated lastRefreshedAt",
lastRefreshedAt, fin.getLastRefreshedBlocksAtForTesting());
assertSame("should not have modified locatedBlocks",
existing, fin.locatedBlocks);
// fake a dead node to force refresh
// refreshBlockLocations should return true, indicating we attempted a refresh
// nothing should be changed, because locations have not changed
fin.addToLocalDeadNodes(dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]);
assertTrue("should have attempted refresh",
fin.refreshBlockLocations(null));
verifyChanged(fin, existing, lastRefreshedAt);
// reset
lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
existing = fin.locatedBlocks;
// It's hard to test explicitly for non-local nodes, but we can fake it
// because we also treat unresolved as non-local. Pass in a cache where all the datanodes
// are unresolved hosts.
Map<String, InetSocketAddress> mockAddressCache = new HashMap<>();
InetSocketAddress unresolved = InetSocketAddress.createUnresolved("www.google.com", 80);
for (DataNode dataNode : dfsCluster.getDataNodes()) {
mockAddressCache.put(dataNode.getDatanodeUuid(), unresolved);
}
assertTrue("should have attempted refresh",
fin.refreshBlockLocations(mockAddressCache));
verifyChanged(fin, existing, lastRefreshedAt);
}
}
private void verifyChanged(DFSInputStream fin, LocatedBlocks existing, long lastRefreshedAt) {
assertTrue("lastRefreshedAt should have incremented",
fin.getLastRefreshedBlocksAtForTesting() > lastRefreshedAt);
assertNotSame("located blocks should have changed",
existing, fin.locatedBlocks);
assertTrue("deadNodes should be empty",
fin.getLocalDeadNodes().isEmpty());
}
@Test
public void testDeferredRegistrationStatefulRead() throws IOException {
testWithRegistrationMethod(DFSInputStream::read);
}
@Test
public void testDeferredRegistrationPositionalRead() throws IOException {
testWithRegistrationMethod(fin -> fin.readFully(0, new byte[1]));
}
@Test
public void testDeferredRegistrationGetAllBlocks() throws IOException {
testWithRegistrationMethod(DFSInputStream::getAllBlocks);
}
@FunctionalInterface
interface ThrowingConsumer {
void accept(DFSInputStream fin) throws IOException;
}
private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) throws IOException {
final String fileName = "/test_cache_locations";
filePath = createFile(fileName);
DFSInputStream fin = null;
FSDataOutputStream fout = null;
try {
// create a file and write for testing
fout = fs.create(filePath, REPLICATION_FACTOR);
fout.write(new byte[(fileLength)]);
// finalize the file by closing the output stream
fout.close();
fout = null;
// get the located blocks
LocatedBlocks referenceLocatedBlocks =
dfsClient.getLocatedBlocks(fileName, 0, fileLength);
assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
String poolId = dfsCluster.getNamesystem().getBlockPoolId();
fin = dfsClient.open(fileName);
// get the located blocks from fin
LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
finLocatedBlocks.locatedBlockCount());
final int chunkReadSize = BLOCK_SIZE / 4;
byte[] readBuffer = new byte[chunkReadSize];
// read the first block
DatanodeInfo prevDNInfo = null;
DatanodeInfo currDNInfo = null;
int bytesRead = 0;
int firstBlockMark = BLOCK_SIZE;
// get the second block locations
LocatedBlock firstLocatedBlk =
fin.locatedBlocks.getLocatedBlocks().get(0);
DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
while (fin.getPos() < firstBlockMark) {
bytesRead = fin.read(readBuffer);
Assert.assertTrue("Unexpected number of read bytes",
chunkReadSize >= bytesRead);
if (currDNInfo == null) {
currDNInfo = fin.getCurrentDatanode();
assertNotNull("current FIS datanode is null", currDNInfo);
continue;
}
prevDNInfo = currDNInfo;
currDNInfo = fin.getCurrentDatanode();
assertEquals("the DFSInput stream does not read from same node",
prevDNInfo, currDNInfo);
}
assertFalse("should not be tracking input stream on open",
dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
assertEquals("InputStream exceeds expected position",
firstBlockMark, fin.getPos());
// get the second block locations
LocatedBlock secondLocatedBlk =
fin.locatedBlocks.getLocatedBlocks().get(1);
// get the nodeinfo for that block
DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
// stop the datanode in the list of the
DataNode deadNode = getdataNodeFromHostName(dfsCluster,
deadNodeInfo.getHostName());
// Shutdown and wait for datanode to be marked dead
DatanodeRegistration reg = InternalDataNodeTestUtils.
getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
DataNodeProperties stoppedDNProps =
dfsCluster.stopDataNode(deadNodeInfo.getName());
// still not registered because it hasn't been an hour by the time we call this
registrationMethod.accept(fin);
assertFalse("should not be tracking input stream after first read",
dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
// get the located blocks
LocatedBlocks afterStoppageLocatedBlocks =
dfsClient.getLocatedBlocks(fileName, 0, fileLength);
// read second block
int secondBlockMark = (int) (1.5 * BLOCK_SIZE);
boolean firstIteration = true;
if (this.enableBlkExpiration) {
// set the time stamps to make sure that we do not refresh locations yet
fin.setReadTimeStampsForTesting(Time.monotonicNow());
}
while (fin.getPos() < secondBlockMark) {
bytesRead = fin.read(readBuffer);
assertTrue("dead node used to read at position: " + fin.getPos(),
fin.deadNodesContain(deadNodeInfo));
Assert.assertTrue("Unexpected number of read bytes",
chunkReadSize >= bytesRead);
prevDNInfo = currDNInfo;
currDNInfo = fin.getCurrentDatanode();
assertNotEquals(deadNodeInfo, currDNInfo);
if (firstIteration) {
// currDNInfo has to be different unless first block locs is different
assertFalse("FSInputStream should pick a different DN",
firstBlkDNInfos[0].equals(deadNodeInfo)
&& prevDNInfo.equals(currDNInfo));
firstIteration = false;
}
}
assertEquals("InputStream exceeds expected position",
secondBlockMark, fin.getPos());
// restart the dead node with the same port
assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
dfsCluster.waitActive();
List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
// continue reading from block 2 again. We should read from deadNode
int thirdBlockMark = 2 * BLOCK_SIZE;
firstIteration = true;
while (fin.getPos() < thirdBlockMark) {
bytesRead = fin.read(readBuffer);
if (this.enableBlkExpiration) {
assertEquals("node is removed from deadNodes after 1st iteration",
firstIteration, fin.deadNodesContain(deadNodeInfo));
} else {
assertTrue(fin.deadNodesContain(deadNodeInfo));
}
Assert.assertTrue("Unexpected number of read bytes",
chunkReadSize >= bytesRead);
prevDNInfo = currDNInfo;
currDNInfo = fin.getCurrentDatanode();
if (!this.enableBlkExpiration) {
assertNotEquals(deadNodeInfo, currDNInfo);
}
if (firstIteration) {
assertEquals(prevDNInfo, currDNInfo);
firstIteration = false;
if (this.enableBlkExpiration) {
// reset the time stamps of located blocks to force cache expiration
fin.setReadTimeStampsForTesting(
Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
}
}
}
assertEquals("InputStream exceeds expected position",
thirdBlockMark, fin.getPos());
// artificially make it have been an hour
fin.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
registrationMethod.accept(fin);
assertEquals("SHOULD be tracking input stream on read after interval, only if enabled",
enableBlkExpiration, dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
} finally {
if (fout != null) {
fout.close();
}
if (fin != null) {
fin.close();
assertFalse(dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
}
fs.delete(filePath, true);
}
}
private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
String hostName) {
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeId().getHostName().equals(hostName)) {
return dn;
}
private Path createFile(String fileName) throws IOException {
Path path = new Path(fileName);
try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) {
fout.write(new byte[(fileLength)]);
}
return null;
return path;
}
}
}

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestLocatedBlocksRefresher {
private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);
private static final int BLOCK_SIZE = 1024 * 1024;
private static final short REPLICATION_FACTOR = (short) 4;
private static final String[] RACKS = new String[] {
"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
private static final int NUM_DATA_NODES = RACKS.length;
private final int numOfBlocks = 24;
private final int fileLength = numOfBlocks * BLOCK_SIZE;
private final int dfsClientPrefetchSize = fileLength / 2;
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setUp() throws Exception {
cluster = null;
conf = new HdfsConfiguration();
// disable shortcircuit reading
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
// set replication factor
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
// set block size and other sizes
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
dfsClientPrefetchSize);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown(true, true);
}
}
private void setupTest(long refreshInterval) throws IOException {
conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, refreshInterval);
// this is necessary to ensure no caching between runs
conf.set("dfs.client.context", UUID.randomUUID().toString());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
cluster.waitActive();
}
@Test
public void testDisabledOnZeroInterval() throws IOException {
setupTest(0);
assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher());
}
@Test
public void testEnabledOnNonZeroInterval() throws Exception {
setupTest(1000);
LocatedBlocksRefresher refresher =
cluster.getFileSystem().getClient().getLocatedBlockRefresher();
assertNotNull(refresher);
assertNoMoreRefreshes(refresher);
}
@Test
public void testRefreshOnDeadNodes() throws Exception {
setupTest(1000);
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.getClient();
LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher();
String fileName = createTestFile(fs);
try (DFSInputStream fin = client.open(fileName)) {
LocatedBlocks locatedBlocks = fin.locatedBlocks;
assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
locatedBlocks.locatedBlockCount());
// should not be tracked yet
assertFalse(refresher.isInputStreamTracked(fin));
// track and verify
refresher.addInputStream(fin);
assertTrue(refresher.isInputStreamTracked(fin));
// no refreshes yet, as nothing has happened
assertNoMoreRefreshes(refresher);
synchronized (fin.infoLock) {
assertSame(locatedBlocks, fin.locatedBlocks);
}
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1);
// read blocks, which should trigger dead node for the one we stopped
int chunkReadSize = BLOCK_SIZE / 4;
byte[] readBuffer = new byte[chunkReadSize];
fin.read(0, readBuffer, 0, readBuffer.length);
assertEquals(1, fin.getLocalDeadNodes().size());
// we should get a refresh now
assertRefreshes(refresher, 1);
// verify that it actually changed things
synchronized (fin.infoLock) {
assertNotSame(locatedBlocks, fin.locatedBlocks);
assertTrue(fin.getLocalDeadNodes().isEmpty());
}
// no more refreshes because everything is happy again
assertNoMoreRefreshes(refresher);
// stop another node, and try to trigger a new deadNode
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2);
readBuffer = new byte[chunkReadSize];
fin.read(0, readBuffer, 0, readBuffer.length);
// we should refresh again now, and verify
// may actually be more than 1, since the first dead node
// may still be listed in the replicas for the bock
assertTrue(fin.getLocalDeadNodes().size() > 0);
assertRefreshes(refresher, 1);
synchronized (fin.infoLock) {
assertNotSame(locatedBlocks, fin.locatedBlocks);
assertTrue(fin.getLocalDeadNodes().isEmpty());
}
// de-register, and expect no more refreshes below
refresher.removeInputStream(fin);
}
assertNoMoreRefreshes(refresher);
}
private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) {
synchronized (fin.infoLock) {
int idx = fin.locatedBlocks.findBlock(0);
for (int i = 0; i < REPLICATION_FACTOR; i++) {
String deadNodeAddr = fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr();
DataNodeProperties dataNodeProperties = cluster.stopDataNode(deadNodeAddr);
if (dataNodeProperties != null) {
List<DataNode> datanodesPostStoppage = cluster.getDataNodes();
assertEquals(expectedNodes, datanodesPostStoppage.size());
return;
}
}
throw new RuntimeException("Could not find a datanode to stop");
}
}
private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws InterruptedException {
long interval = refresher.getInterval();
int runCount = refresher.getRunCount();
int refreshCount = refresher.getRefreshCount();
LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes",
runCount + 3, runCount);
// wait for it to run 3 times, with some buffer
awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval);
// it should not have refreshed anything, because no DFSInputStreams registered anymore
assertEquals(refreshCount, refresher.getRefreshCount());
}
private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes)
throws InterruptedException {
int runCount = refresher.getRunCount();
int refreshCount = refresher.getRefreshCount();
int expectedRuns = 3;
if (expectedRefreshes < 0) {
expectedRefreshes = expectedRuns;
}
LOG.info(
"Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount
);
// wait for it to run 3 times
awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000);
// the values may not be identical due to any refreshes that occurred before we opened
// the DFSInputStream but the difference should be identical since we are refreshing
// every time
assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount);
}
private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
throws InterruptedException {
long now = Time.monotonicNow();
while(!test.get()) {
if (Time.monotonicNow() - now > timeoutMillis) {
fail("Timed out waiting for true condition");
return;
}
Thread.sleep(50);
}
}
private String createTestFile(FileSystem fs) throws IOException {
String fileName = "/located_blocks_" + UUID.randomUUID().toString();
Path filePath = new Path(fileName);
try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
fout.write(new byte[(fileLength)]);
}
fs.deleteOnExit(filePath);
return fileName;
}
}