HDFS-6591. while loop is executed tens of thousands of times in Hedged Read. Contributed by Liang Xie.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1606927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-06-30 20:46:44 +00:00
parent 4ac6e1d895
commit 0ca41a8f35
4 changed files with 167 additions and 73 deletions

View File

@ -739,6 +739,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade. HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade.
(Chen He via kihwal) (Chen He via kihwal)
HDFS-6591. while loop is executed tens of thousands of times in Hedged Read
(Liang Xie via cnauroth)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -29,6 +31,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSClientFaultInjector { public class DFSClientFaultInjector {
public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
public static AtomicLong exceptionNum = new AtomicLong(0);
public static DFSClientFaultInjector get() { public static DFSClientFaultInjector get() {
return instance; return instance;
@ -47,4 +50,6 @@ public boolean failPacket() {
} }
public void startFetchFromDatanode() {} public void startFetchFromDatanode() {}
public void fetchFromDatanodeException() {}
} }

View File

@ -32,12 +32,14 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -81,6 +83,7 @@ public class DFSInputStream extends FSInputStream
HasEnhancedByteBufferAccess { HasEnhancedByteBufferAccess {
@VisibleForTesting @VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false; public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
private final DFSClient dfsClient; private final DFSClient dfsClient;
private boolean closed = false; private boolean closed = false;
private final String src; private final String src;
@ -976,20 +979,15 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end, final LocatedBlock block, final long start, final long end,
final ByteBuffer bb, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
final CountDownLatch latch) {
return new Callable<ByteBuffer>() { return new Callable<ByteBuffer>() {
@Override @Override
public ByteBuffer call() throws Exception { public ByteBuffer call() throws Exception {
try { byte[] buf = bb.array();
byte[] buf = bb.array(); int offset = bb.position();
int offset = bb.position(); actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap);
corruptedBlockMap); return bb;
return bb;
} finally {
latch.countDown();
}
} }
}; };
} }
@ -1018,6 +1016,7 @@ private void actualGetFromOneDataNode(final DNAddrPair datanode,
BlockReader reader = null; BlockReader reader = null;
try { try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
reader = new BlockReaderFactory(dfsClient.getConf()). reader = new BlockReaderFactory(dfsClient.getConf()).
@ -1097,35 +1096,43 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException { throws IOException {
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
CompletionService<ByteBuffer> hedgedService =
new ExecutorCompletionService<ByteBuffer>(
dfsClient.getHedgedReadsThreadPool());
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null; ByteBuffer bb = null;
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
block = getBlockAt(block.getStartOffset(), false); block = getBlockAt(block.getStartOffset(), false);
// Latch shared by all outstanding reads. First to finish closes
CountDownLatch hasReceivedResult = new CountDownLatch(1);
while (true) { while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
DNAddrPair chosenNode = null; DNAddrPair chosenNode = null;
Future<ByteBuffer> future = null; // there is no request already executing.
// futures is null if there is no request already executing.
if (futures.isEmpty()) { if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to // chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read. // the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len); bb = ByteBuffer.wrap(buf, offset, len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb, Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
corruptedBlockMap, hasReceivedResult); chosenNode, block, start, end, bb, corruptedBlockMap);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
try { try {
future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); Future<ByteBuffer> future = hedgedService.poll(
return; dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) { if (future != null) {
future.get();
return;
}
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
"ms to read from " + chosenNode.info + "; spawning hedged read"); + "ms to read from " + chosenNode.info
+ "; spawning hedged read");
} }
// Ignore this node on next go around. // Ignore this node on next go around.
ignored.add(chosenNode.info); ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps(); dfsClient.getHedgedReadMetrics().incHedgedReadOps();
futures.add(future);
continue; // no need to refresh block locations continue; // no need to refresh block locations
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Ignore // Ignore
@ -1133,25 +1140,31 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
// Ignore already logged in the call. // Ignore already logged in the call.
} }
} else { } else {
// We are starting up a 'hedged' read. We have a read already // We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass. // If no nodes to do hedged reads against, pass.
try { try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
future = getHedgedReadFuture(chosenNode, block, start, end, bb, Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
corruptedBlockMap, hasReceivedResult); chosenNode, block, start, end, bb, corruptedBlockMap);
futures.add(future); Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} catch (IOException ioe) { } catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Failed getting node for hedged read: " + DFSClient.LOG.debug("Failed getting node for hedged read: "
ioe.getMessage()); + ioe.getMessage());
} }
} }
// if not succeeded. Submit callables for each datanode in a loop, wait // if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one. // for a fixed interval and get the result from the fastest one.
try { try {
ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest. // cancel the rest.
cancelAll(futures); cancelAll(futures);
if (result.array() != buf) { // compare the array pointers if (result.array() != buf) { // compare the array pointers
@ -1163,50 +1176,43 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
} }
return; return;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Ignore // Ignore and retry
} catch (ExecutionException e) {
// exception already handled in the call method. getFirstToComplete
// will remove the failing future from the list. nothing more to do.
} }
// We got here if exception. Ignore this node on next go around IFF // We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against. // we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) { if (chosenNode != null && chosenNode.info != null) {
ignored.add(chosenNode.info); ignored.add(chosenNode.info);
} }
} }
// executed if we get an error from a data node
block = getBlockAt(block.getStartOffset(), false);
} }
} }
private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode, @VisibleForTesting
final LocatedBlock block, long start, public long getHedgedReadOpsLoopNumForTesting() {
final long end, final ByteBuffer bb, return hedgedReadOpsLoopNumForTesting;
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final CountDownLatch hasReceivedResult) {
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlockMap, hasReceivedResult);
return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
} }
private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures, private ByteBuffer getFirstToComplete(
CountDownLatch latch) throws ExecutionException, InterruptedException { CompletionService<ByteBuffer> hedgedService,
latch.await(); ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
for (Future<ByteBuffer> future : futures) { if (futures.isEmpty()) {
if (future.isDone()) { throw new InterruptedException("let's retry");
try {
return future.get();
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
throw e;
}
}
} }
throw new InterruptedException("latch has counted down to zero but no" Future<ByteBuffer> future = null;
+ "result available yet, for safety try to request another one from" try {
+ "outside loop, this should be rare"); future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
} }
private void cancelAll(List<Future<ByteBuffer>> futures) { private void cancelAll(List<Future<ByteBuffer>> futures) {

View File

@ -31,12 +31,16 @@
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -49,7 +53,14 @@
public class TestPread { public class TestPread {
static final long seed = 0xDEADBEEFL; static final long seed = 0xDEADBEEFL;
static final int blockSize = 4096; static final int blockSize = 4096;
boolean simulatedStorage = false; boolean simulatedStorage;
boolean isHedgedRead;
@Before
public void setup() {
simulatedStorage = false;
isHedgedRead = false;
}
private void writeFile(FileSystem fileSys, Path name) throws IOException { private void writeFile(FileSystem fileSys, Path name) throws IOException {
int replication = 3;// We need > 1 blocks to test out the hedged reads. int replication = 3;// We need > 1 blocks to test out the hedged reads.
@ -73,7 +84,7 @@ private void writeFile(FileSystem fileSys, Path name) throws IOException {
// now create the real file // now create the real file
DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize, DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
blockSize, (short) 1, seed); blockSize, (short) replication, seed);
} }
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
@ -104,8 +115,13 @@ private void doPread(FSDataInputStream stm, long position, byte[] buffer,
} }
if (dfstm != null) { if (dfstm != null) {
assertEquals("Expected read statistic to be incremented", length, dfstm if (isHedgedRead) {
.getReadStatistics().getTotalBytesRead() - totalRead); assertTrue("Expected read statistic to be incremented", length <= dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
} }
} }
@ -208,7 +224,7 @@ private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
stm.readFully(0, actual); stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test"); checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
} }
private void cleanupFile(FileSystem fileSys, Path name) throws IOException { private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name)); assertTrue(fileSys.exists(name));
assertTrue(fileSys.delete(name, true)); assertTrue(fileSys.delete(name, true));
@ -249,6 +265,7 @@ public void testPreadDFSNoChecksum() throws IOException {
*/ */
@Test @Test
public void testHedgedPreadDFSBasic() throws IOException { public void testHedgedPreadDFSBasic() throws IOException {
isHedgedRead = true;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1); conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1);
@ -257,9 +274,73 @@ public void testHedgedPreadDFSBasic() throws IOException {
// transferTo. // transferTo.
} }
@Test
public void testHedgedReadLoopTooManyTimes() throws IOException {
Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5;
final int hedgedReadTimeoutMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
hedgedReadTimeoutMillis);
// Set up the InjectionHandler
DFSClientFaultInjector.instance = Mockito
.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (true) {
Thread.sleep(hedgedReadTimeoutMillis + 1);
if (DFSClientFaultInjector.exceptionNum.compareAndSet(0, 1)) {
System.out.println("-------------- throw Checksum Exception");
throw new ChecksumException("ChecksumException test", 100);
}
}
return null;
}
}).when(injector).fetchFromDatanodeException();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
FSDataOutputStream output = null;
DFSInputStream input = null;
String filename = "/hedgedReadMaxOut.dat";
try {
Path file = new Path(filename);
output = fileSys.create(file, (short) 2);
byte[] data = new byte[64 * 1024];
output.write(data);
output.flush();
output.write(data);
output.flush();
output.write(data);
output.flush();
output.close();
byte[] buffer = new byte[64 * 1024];
input = dfsClient.open(filename);
input.read(0, buffer, 0, 1024);
input.close();
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
} catch (BlockMissingException e) {
assertTrue(false);
} finally {
IOUtils.cleanup(null, input);
IOUtils.cleanup(null, output);
fileSys.close();
cluster.shutdown();
Mockito.reset(injector);
}
}
@Test @Test
public void testMaxOutHedgedReadPool() throws IOException, public void testMaxOutHedgedReadPool() throws IOException,
InterruptedException, ExecutionException { InterruptedException, ExecutionException {
isHedgedRead = true;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5; int numHedgedReadPoolThreads = 5;
final int initialHedgedReadTimeoutMillis = 50000; final int initialHedgedReadTimeoutMillis = 50000;
@ -367,7 +448,6 @@ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean
public void testPreadDFSSimulated() throws IOException { public void testPreadDFSSimulated() throws IOException {
simulatedStorage = true; simulatedStorage = true;
testPreadDFS(); testPreadDFS();
simulatedStorage = false;
} }
/** /**