diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java index 6b3d8e07ce..dfce230403 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -497,4 +497,14 @@ public String toString() { public DomainSocketWatcher getDomainSocketWatcher() { return domainSocketWatcher; } + + @VisibleForTesting + public int getShmNum() { + int segments = 0; + for (EndpointShmManager endpointShmManager : datanodes.values()) { + segments += + endpointShmManager.notFull.size() + endpointShmManager.full.size(); + } + return segments; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index d4d898c892..a9970c2f3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -22,6 +22,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.SocketException; import java.nio.MappedByteBuffer; import java.util.HashMap; import java.util.Map; @@ -181,25 +182,52 @@ private class SlotReleaser implements Runnable { @Override public void run() { + if (slot == null) { + return; + } LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot); final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final String path = shmSock.getPath(); + DataOutputStream out = null; boolean success = false; - try (DomainSocket sock = DomainSocket.connect(path); - DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(sock.getOutputStream()))) { - new Sender(out).releaseShortCircuitFds(slot.getSlotId()); - DataInputStream in = new DataInputStream(sock.getInputStream()); - ReleaseShortCircuitAccessResponseProto resp = - ReleaseShortCircuitAccessResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - if (resp.getStatus() != Status.SUCCESS) { - String error = resp.hasError() ? resp.getError() : "(unknown)"; - throw new IOException(resp.getStatus().toString() + ": " + error); + int retries = 2; + try { + while (retries > 0) { + try { + if (domainSocket == null || !domainSocket.isOpen()) { + // we are running in single thread mode, no protection needed for + // domainSocket + domainSocket = DomainSocket.connect(path); + } + + out = new DataOutputStream( + new BufferedOutputStream(domainSocket.getOutputStream())); + new Sender(out).releaseShortCircuitFds(slot.getSlotId()); + DataInputStream in = + new DataInputStream(domainSocket.getInputStream()); + ReleaseShortCircuitAccessResponseProto resp = + ReleaseShortCircuitAccessResponseProto + .parseFrom(PBHelperClient.vintPrefixed(in)); + if (resp.getStatus() != Status.SUCCESS) { + String error = resp.hasError() ? resp.getError() : "(unknown)"; + throw new IOException(resp.getStatus().toString() + ": " + error); + } + + LOG.trace("{}: released {}", this, slot); + success = true; + break; + + } catch (SocketException se) { + // the domain socket on datanode may be timed out, we retry once + retries--; + domainSocket.close(); + domainSocket = null; + if (retries == 0) { + throw new SocketException("Create domain socket failed"); + } + } } - LOG.trace("{}: released {}", this, slot); - success = true; } catch (IOException e) { LOG.warn(ShortCircuitCache.this + ": failed to release " + "short-circuit shared memory slot " + slot + " by sending " @@ -211,6 +239,8 @@ public void run() { shmManager.freeSlot(slot); } else { shm.getEndpointShmManager().shutdown(shm); + IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out); + domainSocket = null; } } } @@ -324,6 +354,8 @@ public interface ShortCircuitReplicaCreator { */ private final DfsClientShmManager shmManager; + private DomainSocket domainSocket = null; + public static ShortCircuitCache fromConf(ShortCircuitConf conf) { return new ShortCircuitCache( conf.getShortCircuitStreamsCacheSize(), @@ -997,6 +1029,9 @@ public void freeSlot(Slot slot) { * @param slot The slot to release. */ public void scheduleSlotReleaser(Slot slot) { + if (slot == null) { + return; + } Preconditions.checkState(shmManager != null); releaserExecutor.execute(new SlotReleaser(slot)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java index 3df83cfbad..4b35109326 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -404,4 +404,9 @@ boolean accept(HashMap segments, public synchronized boolean visit(Visitor visitor) { return visitor.accept(segments, slots); } + + @VisibleForTesting + public int getShmNum() { + return segments.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 53cac2adee..e637f4ceed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.hamcrest.CoreMatchers.equalTo; import java.io.DataOutputStream; @@ -28,6 +29,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -910,4 +912,94 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { } } } + + @Test(timeout = 60000) + public void testDomainSocketClosedByDN() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = + createShortCircuitConf("testDomainSocketClosedByDN", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + DomainPeer peer = getDomainPeerToDn(conf); + MutableBoolean usedPeer = new MutableBoolean(false); + ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz"); + final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder() + .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build(); + // Allocating the first shm slot requires using up a peer. + Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId, + "testReleaseSlotReuseDomainSocket_client"); + + cluster.getDataNodes().get(0).getShortCircuitRegistry() + .registerSlot(blockId, slot1.getSlotId(), false); + + Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId, + "testReleaseSlotReuseDomainSocket_client"); + + cluster.getDataNodes().get(0).getShortCircuitRegistry() + .registerSlot(blockId, slot2.getSlotId(), false); + + cache.scheduleSlotReleaser(slot1); + + Thread.sleep(2000); + cache.scheduleSlotReleaser(slot2); + Thread.sleep(2000); + Assert.assertEquals(0, + cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum()); + Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum()); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout = 60000) + public void testDNRestart() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf("testDNRestart", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + DomainPeer peer = getDomainPeerToDn(conf); + MutableBoolean usedPeer = new MutableBoolean(false); + ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz"); + final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder() + .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build(); + // Allocating the first shm slot requires using up a peer. + Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId, + "testReleaseSlotReuseDomainSocket_client"); + + cluster.getDataNodes().get(0).getShortCircuitRegistry() + .registerSlot(blockId, slot1.getSlotId(), false); + + // restart the datanode to invalidate the cache + cluster.restartDataNode(0); + Thread.sleep(1000); + // after the restart, new allocation and release should not be affect + cache.scheduleSlotReleaser(slot1); + + Slot slot2 = null; + try { + slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId, + "testReleaseSlotReuseDomainSocket_client"); + } catch (ClosedChannelException ce) { + + } + cache.scheduleSlotReleaser(slot2); + Thread.sleep(2000); + Assert.assertEquals(0, + cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum()); + Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum()); + } finally { + cluster.shutdown(); + } + } }