HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)

Reviewed-by: Lisheng Sun <sunlisheng@apache.org>
This commit is contained in:
Quanlong Huang 2022-04-18 10:32:29 +08:00 committed by GitHub
parent dbeeee0363
commit 35d4c02bcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 8 deletions

View File

@ -189,6 +189,7 @@ public void run() {
final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath(); final String path = shmSock.getPath();
DomainSocket domainSocket = pathToDomainSocket.get(path);
DataOutputStream out = null; DataOutputStream out = null;
boolean success = false; boolean success = false;
int retries = 2; int retries = 2;
@ -196,9 +197,10 @@ public void run() {
while (retries > 0) { while (retries > 0) {
try { try {
if (domainSocket == null || !domainSocket.isOpen()) { if (domainSocket == null || !domainSocket.isOpen()) {
// we are running in single thread mode, no protection needed for
// domainSocket
domainSocket = DomainSocket.connect(path); domainSocket = DomainSocket.connect(path);
// we are running in single thread mode, no protection needed for
// pathToDomainSocket
pathToDomainSocket.put(path, domainSocket);
} }
out = new DataOutputStream( out = new DataOutputStream(
@ -221,13 +223,16 @@ public void run() {
} catch (SocketException se) { } catch (SocketException se) {
// the domain socket on datanode may be timed out, we retry once // the domain socket on datanode may be timed out, we retry once
retries--; retries--;
domainSocket.close(); if (domainSocket != null) {
domainSocket = null; domainSocket.close();
domainSocket = null;
pathToDomainSocket.remove(path);
}
if (retries == 0) { if (retries == 0) {
throw new SocketException("Create domain socket failed"); throw new SocketException("Create domain socket failed");
} }
} }
} } // end of while block
} catch (IOException e) { } catch (IOException e) {
LOG.warn(ShortCircuitCache.this + ": failed to release " LOG.warn(ShortCircuitCache.this + ": failed to release "
+ "short-circuit shared memory slot " + slot + " by sending " + "short-circuit shared memory slot " + slot + " by sending "
@ -240,10 +245,10 @@ public void run() {
} else { } else {
shm.getEndpointShmManager().shutdown(shm); shm.getEndpointShmManager().shutdown(shm);
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out); IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
domainSocket = null; pathToDomainSocket.remove(path);
} }
} }
} } // end of run()
} }
public interface ShortCircuitReplicaCreator { public interface ShortCircuitReplicaCreator {
@ -354,7 +359,11 @@ public interface ShortCircuitReplicaCreator {
*/ */
private final DfsClientShmManager shmManager; private final DfsClientShmManager shmManager;
private DomainSocket domainSocket = null; /**
* A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
* paths of short-circuit shared memory segments.
*/
private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();
public static ShortCircuitCache fromConf(ShortCircuitConf conf) { public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache( return new ShortCircuitCache(

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
@ -957,6 +958,83 @@ public void testDomainSocketClosedByDN() throws Exception {
} }
} }
// Regression test for HDFS-16535
@Test(timeout = 60000)
public void testDomainSocketClosedByMultipleDNs() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
String testName = "testDomainSocketClosedByMultipleDNs";
Configuration conf = createShortCircuitConf(testName, sockDir);
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
testName + "._PORT").getAbsolutePath());
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
DataNode dn0 = cluster.getDataNodes().get(0);
DataNode dn1 = cluster.getDataNodes().get(1);
DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(dn0.getDatanodeId()).build();
final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(dn1.getDatanodeId()).build();
// Allocate 2 shm slots from DataNode-0
MutableBoolean usedPeer = new MutableBoolean(false);
Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
"testDomainSocketClosedByMultipleDNs_client");
dn0.getShortCircuitRegistry()
.registerSlot(blockId0, slot1.getSlotId(), false);
Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
"testDomainSocketClosedByMultipleDNs_client");
dn0.getShortCircuitRegistry()
.registerSlot(blockId0, slot2.getSlotId(), false);
// Allocate 1 shm slot from DataNode-1
Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
"testDomainSocketClosedByMultipleDNs_client");
dn1.getShortCircuitRegistry()
.registerSlot(blockId1, slot3.getSlotId(), false);
Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
// Release the slot of DataNode-1 first.
cache.scheduleSlotReleaser(slot3);
Thread.sleep(2000);
Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
// Release the slots of DataNode-0.
cache.scheduleSlotReleaser(slot1);
Thread.sleep(2000);
Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
" due to slot release failures.",
1, cache.getDfsClientShmManager().getShmNum());
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);
Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testDNRestart() throws Exception { public void testDNRestart() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); TemporarySocketDirectory sockDir = new TemporarySocketDirectory();