HDFS-4417. Fix case where local reads get disabled incorrectly. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1437616 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-23 18:38:56 +00:00
parent 89bd14913a
commit d12f465c67
8 changed files with 263 additions and 100 deletions

View File

@ -30,3 +30,6 @@ HDFS-4418. increase default FileInputStreamCache size (todd)
HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
(Colin Patrick McCabe via todd)
HDFS-4417. Fix case where local reads get disabled incorrectly
(Colin Patrick McCabe and todd via todd)

View File

@ -68,7 +68,7 @@ public class BlockReaderFactory {
* case.
* @param allowShortCircuitLocalReads True if short-circuit local reads
* should be allowed.
* @return New BlockReader instance, or null on error.
* @return New BlockReader instance
*/
@SuppressWarnings("deprecation")
public static BlockReader newBlockReader(

View File

@ -35,8 +35,8 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.net.DomainPeer;
@ -56,7 +56,8 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.hdfs.FileInputStreamCache;
import com.google.common.annotations.VisibleForTesting;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
@ -64,11 +65,11 @@
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
private final DFSClient dfsClient;
private boolean closed = false;
private final String src;
private final long prefetchSize;
private BlockReader blockReader = null;
@ -853,33 +854,23 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
}
}
private Peer newPeer(InetSocketAddress addr) throws IOException {
private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
DomainSocket domSock = null;
try {
domSock = dfsClient.getDomainSocketFactory().create(addr, this);
if (domSock != null) {
// Create a UNIX Domain peer.
peer = new DomainPeer(domSock);
} else {
// Create a conventional TCP-based Peer.
sock = dfsClient.socketFactory.createSocket();
NetUtils.connect(sock, addr,
dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(sock,
dfsClient.getDataEncryptionKey());
}
sock = dfsClient.socketFactory.createSocket();
NetUtils.connect(sock, addr,
dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(sock,
dfsClient.getDataEncryptionKey());
success = true;
return peer;
} finally {
if (!success) {
IOUtils.closeQuietly(peer);
IOUtils.closeQuietly(sock);
IOUtils.closeQuietly(domSock);
}
}
}
@ -888,6 +879,9 @@ private Peer newPeer(InetSocketAddress addr) throws IOException {
* Retrieve a BlockReader suitable for reading.
* This method will reuse the cached connection to the DN if appropriate.
* Otherwise, it will create a new connection.
* Throwing an IOException from this method is basically equivalent to
* declaring the DataNode bad, so we try to connect a lot of different ways
* before doing that.
*
* @param dnAddr Address of the datanode
* @param chosenNode Chosen datanode information
@ -912,9 +906,6 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
boolean verifyChecksum,
String clientName)
throws IOException {
IOException err = null;
// Firstly, we check to see if we have cached any file descriptors for
// local blocks. If so, we can just re-use those file descriptors.
FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
@ -927,67 +918,84 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
}
// We retry several times here.
// On the first nCachedConnRetry times, we try to fetch a socket from
// the socketCache and use it. This may fail, since the old socket may
// have been closed by the peer.
// After that, we try to create a new socket using newPeer().
// This may create either a TCP socket or a UNIX domain socket, depending
// on the configuration and whether the peer is remote.
// If we try to create a UNIX domain socket and fail, we will not try that
// again. Instead, we'll try to create a TCP socket. Only after we've
// failed to create a TCP-based BlockReader will we throw an IOException
// from this function. Throwing an IOException from here is basically
// equivalent to declaring the DataNode bad.
boolean triedNonDomainSocketReader = false;
for (int retries = 0;
retries < nCachedConnRetry || (!triedNonDomainSocketReader);
++retries) {
Peer peer = null;
if (retries < nCachedConnRetry) {
peer = peerCache.get(chosenNode);
}
if (peer == null) {
peer = newPeer(dnAddr);
if (peer.getDomainSocket() == null) {
triedNonDomainSocketReader = true;
}
}
boolean success = false;
// Look for cached domain peers.
int cacheTries = 0;
DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
BlockReader reader = null;
for (; cacheTries < nCachedConnRetry; ++cacheTries) {
Peer peer = peerCache.get(chosenNode, true);
if (peer == null) break;
try {
boolean allowShortCircuitLocalReads =
(peer.getDomainSocket() != null) &&
dfsClient.getConf().shortCircuitLocalReads &&
(!shortCircuitForbidden());
// Here we will try to send either an OP_READ_BLOCK request or an
// OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader
// we're trying to create.
BlockReader blockReader = BlockReaderFactory.newBlockReader(
boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
success = true;
return blockReader;
} catch (IOException ex) {
// Our socket is no good.
DFSClient.LOG.debug("Error making BlockReader. " +
dsFactory, allowShortCircuitLocalReads);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
"Closing stale " + peer, ex);
if (peer.getDomainSocket() != null) {
// If the Peer that we got the error from was a DomainPeer,
// mark the socket path as bad, so that newDataSocket will not try
// to re-open this socket for a while.
dfsClient.getDomainSocketFactory().
disableDomainSocketPath(peer.getDomainSocket().getPath());
}
err = ex;
} finally {
if (!success) {
if (reader == null) {
IOUtils.closeQuietly(peer);
}
}
}
throw err;
// Try to create a DomainPeer.
DomainSocket domSock = dsFactory.create(dnAddr, this);
if (domSock != null) {
Peer peer = new DomainPeer(domSock);
try {
boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, allowShortCircuitLocalReads);
return reader;
} catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e);
} finally {
if (reader == null) {
// If the Peer that we got the error from was a DomainPeer,
// mark the socket path as bad, so that newDataSocket will not try
// to re-open this socket for a while.
dsFactory.disableDomainSocketPath(domSock.getPath());
IOUtils.closeQuietly(peer);
}
}
}
// Look for cached peers.
for (; cacheTries < nCachedConnRetry; ++cacheTries) {
Peer peer = peerCache.get(chosenNode, false);
if (peer == null) break;
try {
reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
peer, ex);
} finally {
if (reader == null) {
IOUtils.closeQuietly(peer);
}
}
}
if (tcpReadsDisabledForTesting) {
throw new IOException("TCP reads are disabled.");
}
// Try to create a new remote peer.
Peer peer = newTcpPeer(dnAddr);
return BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false);
}

View File

@ -39,6 +39,30 @@
class PeerCache {
private static final Log LOG = LogFactory.getLog(PeerCache.class);
private static class Key {
final DatanodeID dnID;
final boolean isDomain;
Key(DatanodeID dnID, boolean isDomain) {
this.dnID = dnID;
this.isDomain = isDomain;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof Key)) {
return false;
}
Key other = (Key)o;
return dnID.equals(other.dnID) && isDomain == other.isDomain;
}
@Override
public int hashCode() {
return dnID.hashCode() ^ (isDomain ? 1 : 0);
}
}
private static class Value {
private final Peer peer;
private final long time;
@ -59,7 +83,7 @@ long getTime() {
private Daemon daemon;
/** A map for per user per datanode. */
private static LinkedListMultimap<DatanodeID, Value> multimap =
private static LinkedListMultimap<Key, Value> multimap =
LinkedListMultimap.create();
private static int capacity;
private static long expiryPeriod;
@ -124,16 +148,18 @@ public String toString() {
/**
* Get a cached peer connected to the given DataNode.
* @param dnId The DataNode to get a Peer for.
* @param isDomain Whether to retrieve a DomainPeer or not.
*
* @return An open Peer connected to the DN, or null if none
* was found.
*/
public synchronized Peer get(DatanodeID dnId) {
public synchronized Peer get(DatanodeID dnId, boolean isDomain) {
if (capacity <= 0) { // disabled
return null;
}
List<Value> sockStreamList = multimap.get(dnId);
List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
if (sockStreamList == null) {
return null;
}
@ -168,7 +194,8 @@ public synchronized void put(DatanodeID dnId, Peer peer) {
if (capacity == multimap.size()) {
evictOldest();
}
multimap.put(dnId, new Value(peer, Time.monotonicNow()));
multimap.put(new Key(dnId, peer.getDomainSocket() != null),
new Value(peer, Time.monotonicNow()));
}
public synchronized int size() {
@ -180,9 +207,9 @@ public synchronized int size() {
*/
private synchronized void evictExpired(long expiryPeriod) {
while (multimap.size() != 0) {
Iterator<Entry<DatanodeID, Value>> iter =
Iterator<Entry<Key, Value>> iter =
multimap.entries().iterator();
Entry<DatanodeID, Value> entry = iter.next();
Entry<Key, Value> entry = iter.next();
// if oldest socket expired, remove it
if (entry == null ||
Time.monotonicNow() - entry.getValue().getTime() <
@ -201,13 +228,13 @@ private synchronized void evictOldest() {
// We can get the oldest element immediately, because of an interesting
// property of LinkedListMultimap: its iterator traverses entries in the
// order that they were added.
Iterator<Entry<DatanodeID, Value>> iter =
Iterator<Entry<Key, Value>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache! " +
"capacity: " + capacity);
}
Entry<DatanodeID, Value> entry = iter.next();
Entry<Key, Value> entry = iter.next();
IOUtils.cleanup(LOG, entry.getValue().getPeer());
iter.remove();
}

View File

@ -70,7 +70,6 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

View File

@ -114,7 +114,7 @@ public void testKeepaliveTimeouts() throws Exception {
// Take it out of the cache - reading should
// give an EOF.
Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
assertNotNull(peer);
assertEquals(-1, peer.getInputStream().read());
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.*;
/**
* This class tests short-circuit local reads without any FileInputStream or
* Socket caching. This is a regression test for HDFS-4417.
*/
public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
private static TemporarySocketDirectory sockDir;
@BeforeClass
static public void setupCluster() throws Exception {
if (DomainSocket.getLoadingFailureReason() != null) return;
sockDir = new TemporarySocketDirectory();
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
// We want to test reading from stale sockets.
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
5 * 60 * 1000);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
// Avoid using the FileInputStreamCache.
conf.setInt(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0);
DomainSocket.disableBindPathValidation();
DFSInputStream.tcpReadsDisabledForTesting = true;
setupCluster(1, conf);
}
@Before
public void before() {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
}
@AfterClass
static public void teardownCluster() throws Exception {
if (DomainSocket.getLoadingFailureReason() != null) return;
sockDir.close();
TestParallelReadUtil.teardownCluster();
}
}

View File

@ -33,22 +33,25 @@
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.net.unix.DomainSocket;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestPeerCache {
static final Log LOG = LogFactory.getLog(TestPeerCache.class);
private static final int CAPACITY = 3;
private static final int EXPIRY_PERIOD = 20;
private static PeerCache cache =
PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
private static class FakePeer implements Peer {
private boolean closed = false;
private final boolean hasDomain;
private DatanodeID dnId;
public FakePeer(DatanodeID dnId) {
public FakePeer(DatanodeID dnId, boolean hasDomain) {
this.dnId = dnId;
this.hasDomain = hasDomain;
}
@Override
@ -118,39 +121,50 @@ public String toString() {
@Override
public DomainSocket getDomainSocket() {
return null;
if (!hasDomain) return null;
// Return a mock which throws an exception whenever any function is
// called.
return Mockito.mock(DomainSocket.class,
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation)
throws Throwable {
throw new RuntimeException("injected fault.");
} });
}
}
@Test
public void testAddAndRetrieve() throws Exception {
PeerCache cache = PeerCache.getInstance(3, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
FakePeer peer = new FakePeer(dnId);
FakePeer peer = new FakePeer(dnId, false);
cache.put(dnId, peer);
assertTrue(!peer.isClosed());
assertEquals(1, cache.size());
assertEquals(peer, cache.get(dnId));
assertEquals(peer, cache.get(dnId, false));
assertEquals(0, cache.size());
cache.clear();
}
@Test
public void testExpiry() throws Exception {
final int CAPACITY = 3;
final int EXPIRY_PERIOD = 10;
PeerCache cache = PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
DatanodeID dnIds[] = new DatanodeID[CAPACITY];
FakePeer peers[] = new FakePeer[CAPACITY];
for (int i = 0; i < CAPACITY; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id",
100, 101, 102);
peers[i] = new FakePeer(dnIds[i]);
peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
// Check that the peers are cached
assertEquals(CAPACITY, cache.size());
// Wait for the peers to expire
Thread.sleep(EXPIRY_PERIOD * 50);
@ -169,13 +183,15 @@ public void testExpiry() throws Exception {
@Test
public void testEviction() throws Exception {
final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
FakePeer peers[] = new FakePeer[CAPACITY + 1];
for (int i = 0; i < dnIds.length; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id_" + i,
100, 101, 102);
peers[i] = new FakePeer(dnIds[i]);
peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
@ -186,11 +202,11 @@ public void testEviction() throws Exception {
// Add another entry and check that the first entry was evicted
cache.put(dnIds[CAPACITY], peers[CAPACITY]);
assertEquals(CAPACITY, cache.size());
assertSame(null, cache.get(dnIds[0]));
assertSame(null, cache.get(dnIds[0], false));
// Make sure that the other entries are still there
for (int i = 1; i < CAPACITY; ++i) {
Peer peer = cache.get(dnIds[i]);
Peer peer = cache.get(dnIds[i], false);
assertSame(peers[i], peer);
assertTrue(!peer.isClosed());
peer.close();
@ -201,19 +217,56 @@ public void testEviction() throws Exception {
@Test
public void testMultiplePeersWithSameDnId() throws Exception {
final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId);
FakePeer peer = new FakePeer(dnId, false);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
while (!peers.isEmpty()) {
Peer peer = cache.get(dnId);
Peer peer = cache.get(dnId, false);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);
}
assertEquals(0, cache.size());
cache.clear();
}
@Test
public void testDomainSocketPeers() throws Exception {
final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
// Test that get(requireDomainPeer=true) finds the peer with the
// domain socket.
Peer peer = cache.get(dnId, true);
assertTrue(peer.getDomainSocket() != null);
peers.remove(peer);
// Test that get(requireDomainPeer=true) returns null when there are
// no more peers with domain sockets.
peer = cache.get(dnId, true);
assertTrue(peer == null);
// Check that all of the other peers ended up in the cache.
while (!peers.isEmpty()) {
peer = cache.get(dnId, false);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);