diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index a950388a31..df2a92c75c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -189,6 +189,7 @@ public void run() { final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final String path = shmSock.getPath(); + DomainSocket domainSocket = pathToDomainSocket.get(path); DataOutputStream out = null; boolean success = false; int retries = 2; @@ -196,9 +197,10 @@ public void run() { while (retries > 0) { try { if (domainSocket == null || !domainSocket.isOpen()) { - // we are running in single thread mode, no protection needed for - // domainSocket domainSocket = DomainSocket.connect(path); + // we are running in single thread mode, no protection needed for + // pathToDomainSocket + pathToDomainSocket.put(path, domainSocket); } out = new DataOutputStream( @@ -221,13 +223,16 @@ public void run() { } catch (SocketException se) { // the domain socket on datanode may be timed out, we retry once retries--; - domainSocket.close(); - domainSocket = null; + if (domainSocket != null) { + domainSocket.close(); + domainSocket = null; + pathToDomainSocket.remove(path); + } if (retries == 0) { throw new SocketException("Create domain socket failed"); } } - } + } // end of while block } catch (IOException e) { LOG.warn(ShortCircuitCache.this + ": failed to release " + "short-circuit shared memory slot " + slot + " by sending " @@ -240,10 +245,10 @@ public void run() { } else { shm.getEndpointShmManager().shutdown(shm); IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out); - domainSocket = null; + pathToDomainSocket.remove(path); } } - } + } // end of run() } public interface ShortCircuitReplicaCreator { @@ -354,7 +359,11 @@ public interface ShortCircuitReplicaCreator { */ private final DfsClientShmManager shmManager; - private DomainSocket domainSocket = null; + /** + * A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket + * paths of short-circuit shared memory segments. + */ + private Map pathToDomainSocket = new HashMap<>(); public static ShortCircuitCache fromConf(ShortCircuitConf conf) { return new ShortCircuitCache( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 9754da3348..965ac0ac98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; @@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception { } } + // Regression test for HDFS-16535 + @Test(timeout = 60000) + public void testDomainSocketClosedByMultipleDNs() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + String testName = "testDomainSocketClosedByMultipleDNs"; + Configuration conf = createShortCircuitConf(testName, sockDir); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + testName + "._PORT").getAbsolutePath()); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + + ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz"); + ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz"); + + DataNode dn0 = cluster.getDataNodes().get(0); + DataNode dn1 = cluster.getDataNodes().get(1); + + DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File( + sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath())); + DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File( + sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath())); + + final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder() + .setNodeID(dn0.getDatanodeId()).build(); + final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder() + .setNodeID(dn1.getDatanodeId()).build(); + + // Allocate 2 shm slots from DataNode-0 + MutableBoolean usedPeer = new MutableBoolean(false); + Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, + "testDomainSocketClosedByMultipleDNs_client"); + dn0.getShortCircuitRegistry() + .registerSlot(blockId0, slot1.getSlotId(), false); + + Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0, + "testDomainSocketClosedByMultipleDNs_client"); + dn0.getShortCircuitRegistry() + .registerSlot(blockId0, slot2.getSlotId(), false); + + // Allocate 1 shm slot from DataNode-1 + Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1, + "testDomainSocketClosedByMultipleDNs_client"); + dn1.getShortCircuitRegistry() + .registerSlot(blockId1, slot3.getSlotId(), false); + + Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum()); + Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum()); + Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum()); + + // Release the slot of DataNode-1 first. + cache.scheduleSlotReleaser(slot3); + Thread.sleep(2000); + Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum()); + + // Release the slots of DataNode-0. + cache.scheduleSlotReleaser(slot1); + Thread.sleep(2000); + Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" + + " due to slot release failures.", + 1, cache.getDfsClientShmManager().getShmNum()); + cache.scheduleSlotReleaser(slot2); + Thread.sleep(2000); + + Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum()); + Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum()); + Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum()); + } finally { + cluster.shutdown(); + } + } + @Test(timeout = 60000) public void testDNRestart() throws Exception { TemporarySocketDirectory sockDir = new TemporarySocketDirectory();