HDFS-17332 DFSInputStream: avoid logging stacktrace until when we really need to fail a read request with a MissingBlockException (#6446)

Print a warn log message for read retries and only print the full stack trace for a read request failure.

Contributed by: Xing Lin
This commit is contained in:
Xing Lin 2024-01-18 18:03:28 -08:00 committed by GitHub
parent cc4c4be1b7
commit 27ecc23ae7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 404 additions and 30 deletions

View File

@ -771,7 +771,7 @@ public synchronized int read() throws IOException {
* ChecksumFileSystem * ChecksumFileSystem
*/ */
private synchronized int readBuffer(ReaderStrategy reader, int len, private synchronized int readBuffer(ReaderStrategy reader, int len,
CorruptedBlocks corruptedBlocks) CorruptedBlocks corruptedBlocks, final Map<InetSocketAddress, List<IOException>> exceptionMap)
throws IOException { throws IOException {
IOException ioe; IOException ioe;
@ -786,6 +786,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
while (true) { while (true) {
// retry as many times as seekToNewSource allows. // retry as many times as seekToNewSource allows.
try { try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
return reader.readFromBlock(blockReader, len); return reader.readFromBlock(blockReader, len);
} catch (ChecksumException ce) { } catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for " DFSClient.LOG.warn("Found Checksum error for "
@ -796,11 +797,18 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
// we want to remember which block replicas we have tried // we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode); corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
} catch (IOException e) { } catch (IOException e) {
if (!retryCurrentNode) { String msg = String.format("Failed to read block %s for file %s from datanode %s. "
DFSClient.LOG.warn("Exception while reading from " + "Exception is %s. Retry with the current or next available datanode.",
+ getCurrentBlock() + " of " + src + " from " getCurrentBlock().getBlockName(), src, currentNode.getXferAddr(), e);
+ currentNode, e); DFSClient.LOG.warn(msg);
// Add the exception to exceptionMap for this datanode.
InetSocketAddress datanode = currentNode.getResolvedAddress();
if (!exceptionMap.containsKey(datanode)) {
exceptionMap.put(datanode, new LinkedList<IOException>());
} }
exceptionMap.get(datanode).add(e);
ioe = e; ioe = e;
} }
boolean sourceFound; boolean sourceFound;
@ -822,6 +830,29 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
} }
} }
/**
* Send IOExceptions happened at each individual datanode to DFSClient.LOG for a failed read
* request. Used in both readWithStrategy() and pread(), to record the exceptions when a read
* request failed to be served.
* @param position offset in the file where we fail to read
* @param exceptionMap a map which stores the list of IOExceptions for each datanode
*/
private void logDataNodeExceptionsOnReadError(long position, final Map<InetSocketAddress,
List<IOException>> exceptionMap) {
String msg = String.format("Failed to read from all available datanodes for file %s "
+ "at position=%d after retrying.", src, position);
DFSClient.LOG.error(msg);
for (Map.Entry<InetSocketAddress, List<IOException>> dataNodeExceptions :
exceptionMap.entrySet()) {
List<IOException> exceptions = dataNodeExceptions.getValue();
for (IOException ex : exceptions) {
msg = String.format("Exception when fetching file %s at position=%d at datanode %s:", src,
position, dataNodeExceptions.getKey());
DFSClient.LOG.error(msg, ex);
}
}
}
protected synchronized int readWithStrategy(ReaderStrategy strategy) protected synchronized int readWithStrategy(ReaderStrategy strategy)
throws IOException { throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
@ -831,6 +862,9 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
int len = strategy.getTargetLength(); int len = strategy.getTargetLength();
CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
// A map to record IOExceptions when fetching from each datanode. Key is the socketAddress of
// a datanode.
Map<InetSocketAddress, List<IOException>> exceptionMap = new HashMap<>();
failures = 0; failures = 0;
maybeRegisterBlockRefresh(); maybeRegisterBlockRefresh();
@ -852,7 +886,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
} }
} }
long beginReadMS = Time.monotonicNow(); long beginReadMS = Time.monotonicNow();
int result = readBuffer(strategy, realLen, corruptedBlocks); int result = readBuffer(strategy, realLen, corruptedBlocks, exceptionMap);
long readTimeMS = Time.monotonicNow() - beginReadMS; long readTimeMS = Time.monotonicNow() - beginReadMS;
if (result >= 0) { if (result >= 0) {
pos += result; pos += result;
@ -880,6 +914,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
dfsClient.addNodeToDeadNodeDetector(this, currentNode); dfsClient.addNodeToDeadNodeDetector(this, currentNode);
} }
if (--retries == 0) { if (--retries == 0) {
// Fail the request and log all exceptions
logDataNodeExceptionsOnReadError(pos, exceptionMap);
throw e; throw e;
} }
} finally { } finally {
@ -1122,8 +1158,8 @@ private static String getBestNodeDNAddrPairErrorString(
return errMsgr.toString(); return errMsgr.toString();
} }
protected void fetchBlockByteRange(LocatedBlock block, long start, long end, protected void fetchBlockByteRange(LocatedBlock block, long start, long end, ByteBuffer buf,
ByteBuffer buf, CorruptedBlocks corruptedBlocks) CorruptedBlocks corruptedBlocks, final Map<InetSocketAddress, List<IOException>> exceptionMap)
throws IOException { throws IOException {
while (true) { while (true) {
DNAddrPair addressPair = chooseDataNode(block, null); DNAddrPair addressPair = chooseDataNode(block, null);
@ -1131,7 +1167,7 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
block = addressPair.block; block = addressPair.block;
try { try {
actualGetFromOneDataNode(addressPair, start, end, buf, actualGetFromOneDataNode(addressPair, start, end, buf,
corruptedBlocks); corruptedBlocks, exceptionMap);
return; return;
} catch (IOException e) { } catch (IOException e) {
checkInterrupted(e); // check if the read has been interrupted checkInterrupted(e); // check if the read has been interrupted
@ -1142,15 +1178,15 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
} }
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end, final long start, final long end,
final ByteBuffer bb, final ByteBuffer bb,
final CorruptedBlocks corruptedBlocks, final CorruptedBlocks corruptedBlocks,
final int hedgedReadId) { final Map<InetSocketAddress, List<IOException>> exceptionMap) {
return new Callable<ByteBuffer>() { return new Callable<ByteBuffer>() {
@Override @Override
public ByteBuffer call() throws Exception { public ByteBuffer call() throws Exception {
DFSClientFaultInjector.get().sleepBeforeHedgedGet(); DFSClientFaultInjector.get().sleepBeforeHedgedGet();
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks); actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks, exceptionMap);
return bb; return bb;
} }
}; };
@ -1167,7 +1203,8 @@ public ByteBuffer call() throws Exception {
* block replica * block replica
*/ */
void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
final long endInBlk, ByteBuffer buf, CorruptedBlocks corruptedBlocks) final long endInBlk, ByteBuffer buf, CorruptedBlocks corruptedBlocks,
final Map<InetSocketAddress, List<IOException>> exceptionMap)
throws IOException { throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode(); DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once int refetchToken = 1; // only need to get a new access token once
@ -1236,9 +1273,16 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
// ignore IOE, since we can retry it later in a loop // ignore IOE, since we can retry it later in a loop
} }
} else { } else {
String msg = "Failed to connect to " + datanode.addr + " for file " String msg = String.format("Failed to read block %s for file %s from datanode %s. "
+ src + " for block " + block.getBlock() + ":" + e; + "Exception is %s. Retry with the next available datanode.",
DFSClient.LOG.warn("Connection failure: " + msg, e); block.getBlock().getBlockName(), src, datanode.addr, e);
DFSClient.LOG.warn(msg);
// Add the exception to the exceptionMap
if (!exceptionMap.containsKey(datanode.addr)) {
exceptionMap.put(datanode.addr, new LinkedList<IOException>());
}
exceptionMap.get(datanode.addr).add(e);
addToLocalDeadNodes(datanode.info); addToLocalDeadNodes(datanode.info);
dfsClient.addNodeToDeadNodeDetector(this, datanode.info); dfsClient.addNodeToDeadNodeDetector(this, datanode.info);
throw new IOException(msg); throw new IOException(msg);
@ -1270,9 +1314,9 @@ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
* 'hedged' read if the first read is taking longer than configured amount of * 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first. * time. We then wait on which ever read returns first.
*/ */
private void hedgedFetchBlockByteRange(LocatedBlock block, long start, private void hedgedFetchBlockByteRange(LocatedBlock block, long start, long end, ByteBuffer buf,
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) CorruptedBlocks corruptedBlocks,
throws IOException { final Map<InetSocketAddress, List<IOException>> exceptionMap) throws IOException {
final DfsClientConf conf = dfsClient.getConf(); final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>(); ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
CompletionService<ByteBuffer> hedgedService = CompletionService<ByteBuffer> hedgedService =
@ -1280,7 +1324,6 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
ArrayList<DatanodeInfo> ignored = new ArrayList<>(); ArrayList<DatanodeInfo> ignored = new ArrayList<>();
ByteBuffer bb; ByteBuffer bb;
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
int hedgedReadId = 0;
while (true) { while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops // see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++; hedgedReadOpsLoopNumForTesting++;
@ -1293,9 +1336,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
// Latest block, if refreshed internally // Latest block, if refreshed internally
block = chosenNode.block; block = chosenNode.block;
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable =
chosenNode, block, start, end, bb, getFromOneDataNode(chosenNode, start, end, bb, corruptedBlocks, exceptionMap);
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(firstRequest); futures.add(firstRequest);
@ -1335,8 +1377,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
block = chosenNode.block; block = chosenNode.block;
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb, getFromOneDataNode(chosenNode, start, end, bb, corruptedBlocks, exceptionMap);
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = Future<ByteBuffer> oneMoreRequest =
hedgedService.submit(getFromDataNodeCallable); hedgedService.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest); futures.add(oneMoreRequest);
@ -1486,6 +1527,11 @@ private int pread(long position, ByteBuffer buffer)
List<LocatedBlock> blockRange = getBlockRange(position, realLen); List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen; int remaining = realLen;
CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
// A map to record all IOExceptions happened at each datanode when fetching a block.
// In HDFS-17332, we worked on populating this map only for DFSInputStream, but not for
// DFSStripedInputStream. If you need the same function for DFSStripedInputStream, please
// work on it yourself (fetchBlockByteRange() in DFSStripedInputStream).
Map<InetSocketAddress, List<IOException>> exceptionMap = new HashMap<>();
for (LocatedBlock blk : blockRange) { for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset(); long targetStart = position - blk.getStartOffset();
int bytesToRead = (int) Math.min(remaining, int bytesToRead = (int) Math.min(remaining,
@ -1494,11 +1540,17 @@ private int pread(long position, ByteBuffer buffer)
try { try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk, targetStart, hedgedFetchBlockByteRange(blk, targetStart,
targetEnd, buffer, corruptedBlocks); targetEnd, buffer, corruptedBlocks, exceptionMap);
} else { } else {
fetchBlockByteRange(blk, targetStart, targetEnd, fetchBlockByteRange(blk, targetStart, targetEnd,
buffer, corruptedBlocks); buffer, corruptedBlocks, exceptionMap);
} }
} catch (IOException e) {
// When we reach here, it means we fail to fetch the current block from all available
// datanodes. Send IOExceptions in exceptionMap to the log and rethrow the exception to
// fail this request.
logDataNodeExceptionsOnReadError(position, exceptionMap);
throw e;
} finally { } finally {
// Check and report if any block replicas are corrupted. // Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are // BlockMissingException may be caught if all block replicas are
@ -1507,6 +1559,8 @@ private int pread(long position, ByteBuffer buffer)
false); false);
} }
// Reset exceptionMap before fetching the next block.
exceptionMap.clear();
remaining -= bytesToRead; remaining -= bytesToRead;
position += bytesToRead; position += bytesToRead;
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
@ -479,10 +481,14 @@ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
/** /**
* Real implementation of pread. * Real implementation of pread.
* <p>
* Note: exceptionMap is not populated with ioExceptions as what we added for DFSInputStream. If
* you need this function, please implement it.
*/ */
@Override @Override
protected void fetchBlockByteRange(LocatedBlock block, long start, protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks,
final Map<InetSocketAddress, List<IOException>> exceptionMap)
throws IOException { throws IOException {
// Refresh the striped block group // Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -35,6 +37,7 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -52,6 +55,7 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -76,7 +80,12 @@ public class TestPread {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestPread.class.getName()); LoggerFactory.getLogger(TestPread.class.getName());
private final GenericTestUtils.LogCapturer dfsClientLog =
GenericTestUtils.LogCapturer.captureLogs(DFSClient.LOG);
@BeforeClass
public static void setLogLevel() {
GenericTestUtils.setLogLevel(DFSClient.LOG, org.apache.log4j.Level.WARN);
}
@Before @Before
public void setup() { public void setup() {
simulatedStorage = false; simulatedStorage = false;
@ -556,6 +565,164 @@ public Void call() throws IOException {
} }
} }
/**
* Test logging in getFromOneDataNode when the number of IOExceptions can be recovered by
* retrying on a different datanode or by refreshing data nodes and retrying each data node one
* more time.
*/
@Test(timeout=120000)
public void testGetFromOneDataNodeExceptionLogging() throws IOException {
// With maxBlockAcquireFailures = 0, we would try on each datanode only once and if
// we fail on all three datanodes, we fail the read request.
testGetFromOneDataNodeExceptionLogging(0, 0);
testGetFromOneDataNodeExceptionLogging(1, 0);
testGetFromOneDataNodeExceptionLogging(2, 0);
// With maxBlockAcquireFailures = 1, we will re-try each datanode a second time.
// So, we can tolerate up to 5 datanode fetch failures.
testGetFromOneDataNodeExceptionLogging(3, 1);
testGetFromOneDataNodeExceptionLogging(4, 1);
testGetFromOneDataNodeExceptionLogging(5, 1);
}
/**
* Each failed IOException would result in a WARN log of "Failed to connect to XXX. Retry with
* the next available datanode.". We verify the number of such log lines match the number of
* failed DNs.
* <p>
* @param ioExceptions number of IOExceptions to throw during a test.
* @param maxBlockAcquireFailures number of refreshLocation we would perform once we mark
* all current data nodes as dead.
*/
private void testGetFromOneDataNodeExceptionLogging(final int ioExceptions,
int maxBlockAcquireFailures)
throws IOException {
dfsClientLog.clearOutput();
if (ioExceptions < 0 || ioExceptions >= 3 * (maxBlockAcquireFailures+1)) {
return;
}
Configuration conf = new Configuration();
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, maxBlockAcquireFailures);
final int[] count = {0};
// Set up the InjectionHandler
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (count[0] < ioExceptions) {
LOG.info("-------------- throw IOException " + count[0]);
count[0]++;
throw new IOException("IOException test");
}
return null;
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSInputStream input = null;
Path file = new Path("/testfile.dat");
try {
DFSTestUtil.createFile(fileSys, file, fileSize, fileSize, blockSize, (short) 3, seed);
byte[] buffer = new byte[fileSize];
input = dfsClient.open(file.toString());
input.read(0, buffer, 0, fileSize);
assertEquals(ioExceptions, StringUtils.countMatches(dfsClientLog.getOutput(),
"Retry with the next available datanode."));
} finally {
Mockito.reset(injector);
IOUtils.cleanupWithLogger(LOG, input);
fileSys.close();
cluster.shutdown();
dfsClientLog.clearOutput();
}
}
/**
* Test the case where we always hit IOExceptions, causing the read request to fail.
*/
@Test(timeout=60000)
public void testFetchFromDataNodeExceptionLoggingFailedRequest()
throws IOException {
testFetchFromDataNodeExceptionLoggingFailedRequest(0);
testFetchFromDataNodeExceptionLoggingFailedRequest(1);
}
/**
* We verify that BlockMissingException is threw and there is one ERROR log line of
* "Failed to read from all available datanodes for file"
* and 3 * (maxBlockAcquireFailures+1) ERROR log lines of
* "Exception when fetching file /testfile.dat at position".
* <p>
* maxBlockAcquireFailures determines how many times we can retry when we fail to read from
* all three data nodes.
* <ul>
* <li>maxBlockAcquireFailures = 0: no retry. We will only read from each of the three
* data nodes only once. We expect to see 3 ERROR log lines of "Exception when fetching file
* /testfile.dat at position".
* </li>
* <li>maxBlockAcquireFailures = 1: 1 retry. We will read from each of the three data
* nodes twice. We expect to see 6 ERROR log lines of "Exception when fetching file
* /testfile.dat at position".
* </li>
* </ul>
*/
private void testFetchFromDataNodeExceptionLoggingFailedRequest(int maxBlockAcquireFailures)
throws IOException {
dfsClientLog.clearOutput();
Configuration conf = new Configuration();
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, maxBlockAcquireFailures);
// Set up the InjectionHandler
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
LOG.info("-------------- throw IOException ");
throw new IOException("IOException test");
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSInputStream input = null;
Path file = new Path("/testfile.dat");
try {
DFSTestUtil.createFile(fileSys, file, fileSize, fileSize, blockSize, (short) 3, seed);
byte[] buffer = new byte[fileSize];
input = dfsClient.open(file.toString());
input.read(0, buffer, 0, fileSize);
fail();
} catch (BlockMissingException expected) {
// Logging from pread
assertEquals(1, StringUtils.countMatches(dfsClientLog.getOutput(),
"Failed to read from all available datanodes for file"));
assertEquals(3 * (maxBlockAcquireFailures + 1),
StringUtils.countMatches(dfsClientLog.getOutput(),
"Exception when fetching file /testfile.dat at position"));
// Logging from actualGetFromOneDataNode
assertEquals(3 * (maxBlockAcquireFailures + 1),
StringUtils.countMatches(dfsClientLog.getOutput(),
"Retry with the next available datanode."));
} finally {
Mockito.reset(injector);
IOUtils.cleanupWithLogger(LOG, input);
fileSys.close();
cluster.shutdown();
dfsClientLog.clearOutput();
}
}
@Test(timeout=30000) @Test(timeout=30000)
public void testHedgedReadFromAllDNFailed() throws IOException { public void testHedgedReadFromAllDNFailed() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -25,6 +25,7 @@
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -32,6 +33,8 @@
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,10 +42,32 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil.ShortCircuitTestContext; import org.apache.hadoop.hdfs.DFSTestUtil.ShortCircuitTestContext;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestRead { public class TestRead {
final private int BLOCK_SIZE = 512; static final private int BLOCK_SIZE = 512;
static final long SEED = 0xDEADBEEFL;
static final int FILE_SIZE = BLOCK_SIZE * 10;
private static final Logger LOG =
LoggerFactory.getLogger(TestRead.class.getName());
private final GenericTestUtils.LogCapturer dfsClientLog =
GenericTestUtils.LogCapturer.captureLogs(DFSClient.LOG);
@BeforeClass
public static void setLogLevel() {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.WARN);
}
private void testEOF(MiniDFSCluster cluster, int fileLength) throws IOException { private void testEOF(MiniDFSCluster cluster, int fileLength) throws IOException {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -190,4 +215,126 @@ public boolean isSimulated() {
} }
} }
} }
/**
* Test logging in readBuffer() when the number of IOExceptions can be recovered by retrying on
* a different datanode or by refreshing data nodes and retrying each data node one more time.
*/
@Test(timeout=120000)
public void testReadBufferIOExceptionLogging() throws IOException {
testReadBufferIOExceptionLogging(0, 0);
testReadBufferIOExceptionLogging(1, 0);
testReadBufferIOExceptionLogging(2, 0);
testReadBufferIOExceptionLogging(3, 0);
testReadBufferIOExceptionLogging(4, 1);
testReadBufferIOExceptionLogging(5, 1);
testReadBufferIOExceptionLogging(6, 1);
}
/**
* @param ioExceptions number of IOExceptions to throw during a test.
* @param maxBlockAcquireFailures number of refreshLocation we would perform once we mark
* all current data nodes as dead.
*/
private void testReadBufferIOExceptionLogging(final int ioExceptions,
int maxBlockAcquireFailures) throws IOException {
dfsClientLog.clearOutput();
Configuration conf = new Configuration();
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, maxBlockAcquireFailures);
final int[] count = {0};
// Set up the InjectionHandler
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (count[0] < ioExceptions) {
LOG.info("-------------- throw IOException");
count[0]++;
throw new IOException("IOException test");
}
return null;
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSInputStream input = null;
Path file = new Path("/testfile.dat");
try {
DFSTestUtil.createFile(fileSys, file, FILE_SIZE, FILE_SIZE, BLOCK_SIZE, (short) 3, SEED);
byte[] buffer = new byte[FILE_SIZE];
input = dfsClient.open(file.toString());
input.read(buffer, 0, FILE_SIZE);
assertEquals(ioExceptions, StringUtils.countMatches(dfsClientLog.getOutput(),
"Retry with the current or next available datanode."));
} finally {
Mockito.reset(injector);
IOUtils.cleanupWithLogger(LOG, input);
fileSys.close();
cluster.shutdown();
dfsClientLog.clearOutput();
}
}
/**
* Test the case where we always hit IOExceptions, causing the read request to fail.
*/
@Test(timeout=60000)
public void testReadBufferIOExceptionLoggingFailedRequest() throws IOException {
testReadBufferIOExceptionLoggingFailedRequest(0);
testReadBufferIOExceptionLoggingFailedRequest(1);
}
private void testReadBufferIOExceptionLoggingFailedRequest(int maxBlockAcquireFailures)
throws IOException {
dfsClientLog.clearOutput();
Configuration conf = new Configuration();
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, maxBlockAcquireFailures);
// Set up the InjectionHandler
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
LOG.info("-------------- throw IOException");
throw new IOException("IOException test");
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSInputStream input = null;
Path file = new Path("/testfile.dat");
try {
DFSTestUtil.createFile(fileSys, file, FILE_SIZE, FILE_SIZE, BLOCK_SIZE, (short) 3, SEED);
byte[] buffer = new byte[FILE_SIZE];
input = dfsClient.open(file.toString());
input.read(buffer, 0, FILE_SIZE);
fail();
} catch (BlockMissingException e) {
// Logging from readWithStrategy()
assertEquals(1, StringUtils.countMatches(dfsClientLog.getOutput(),
"Failed to read from all available datanodes for file"));
assertEquals(1 + 3L * (maxBlockAcquireFailures + 1),
StringUtils.countMatches(dfsClientLog.getOutput(),
"Exception when fetching file /testfile.dat at position="));
// Logging from actualGetFromOneDataNode
assertEquals(1 + 3L * (maxBlockAcquireFailures + 1),
StringUtils.countMatches(dfsClientLog.getOutput(),
"Retry with the current or next available datanode."));
} finally {
Mockito.reset(injector);
IOUtils.cleanupWithLogger(LOG, input);
fileSys.close();
cluster.shutdown();
dfsClientLog.clearOutput();
}
}
} }