HDFS-13121. NPE when request file descriptors when SC read. Contributed by Zsolt Venczel.
This commit is contained in:
parent
061b168529
commit
0247cb6318
@ -598,6 +598,11 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
|||||||
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
||||||
ShortCircuitReplica replica = null;
|
ShortCircuitReplica replica = null;
|
||||||
try {
|
try {
|
||||||
|
if (fis[0] == null || fis[1] == null) {
|
||||||
|
throw new IOException("the datanode " + datanode + " failed to " +
|
||||||
|
"pass a file descriptor (might have reached open file limit).");
|
||||||
|
}
|
||||||
|
|
||||||
ExtendedBlockId key =
|
ExtendedBlockId key =
|
||||||
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||||
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
||||||
|
@ -42,6 +42,10 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.ClientContext;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.PeerCache;
|
||||||
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
|
||||||
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
@ -50,10 +54,12 @@
|
|||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeer;
|
import org.apache.hadoop.hdfs.net.DomainPeer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||||
@ -66,9 +72,11 @@
|
|||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
@ -819,4 +827,85 @@ public void testFetchOrCreateRetries() throws Exception {
|
|||||||
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
|
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestFileDescriptorsWhenULimit() throws Exception {
|
||||||
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
|
Configuration conf = createShortCircuitConf(
|
||||||
|
"testRequestFileDescriptorsWhenULimit", sockDir);
|
||||||
|
|
||||||
|
final short replicas = 1;
|
||||||
|
final int fileSize = 3;
|
||||||
|
final String testFile = "/testfile";
|
||||||
|
|
||||||
|
try (MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(replicas).build()) {
|
||||||
|
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
DFSTestUtil.createFile(fs, new Path(testFile), fileSize, replicas, 0L);
|
||||||
|
|
||||||
|
LocatedBlock blk = new DFSClient(DFSUtilClient.getNNAddress(conf), conf)
|
||||||
|
.getLocatedBlocks(testFile, 0, fileSize).get(0);
|
||||||
|
|
||||||
|
ClientContext clientContext = Mockito.mock(ClientContext.class);
|
||||||
|
Mockito.when(clientContext.getPeerCache()).thenAnswer(
|
||||||
|
(Answer<PeerCache>) peerCacheCall -> {
|
||||||
|
PeerCache peerCache = new PeerCache(10, Long.MAX_VALUE);
|
||||||
|
DomainPeer peer = Mockito.spy(getDomainPeerToDn(conf));
|
||||||
|
peerCache.put(blk.getLocations()[0], peer);
|
||||||
|
|
||||||
|
Mockito.when(peer.getDomainSocket()).thenAnswer(
|
||||||
|
(Answer<DomainSocket>) domainSocketCall -> {
|
||||||
|
DomainSocket domainSocket = Mockito.mock(DomainSocket.class);
|
||||||
|
Mockito.when(domainSocket
|
||||||
|
.recvFileInputStreams(
|
||||||
|
Mockito.any(FileInputStream[].class),
|
||||||
|
Mockito.any(byte[].class),
|
||||||
|
Mockito.anyInt(),
|
||||||
|
Mockito.anyInt())
|
||||||
|
).thenAnswer(
|
||||||
|
// we are mocking the FileOutputStream array with nulls
|
||||||
|
(Answer<Void>) recvFileInputStreamsCall -> null
|
||||||
|
);
|
||||||
|
return domainSocket;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return peerCache;
|
||||||
|
});
|
||||||
|
|
||||||
|
Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
|
||||||
|
(Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
|
||||||
|
ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
|
||||||
|
Mockito.when(cache.allocShmSlot(
|
||||||
|
Mockito.any(DatanodeInfo.class),
|
||||||
|
Mockito.any(DomainPeer.class),
|
||||||
|
Mockito.any(MutableBoolean.class),
|
||||||
|
Mockito.any(ExtendedBlockId.class),
|
||||||
|
Mockito.anyString()))
|
||||||
|
.thenAnswer((Answer<Slot>) call -> null);
|
||||||
|
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
DatanodeInfo[] nodes = blk.getLocations();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf))
|
||||||
|
.setInetSocketAddress(NetUtils.createSocketAddr(nodes[0]
|
||||||
|
.getXferAddr()))
|
||||||
|
.setClientCacheContext(clientContext)
|
||||||
|
.setDatanodeInfo(blk.getLocations()[0])
|
||||||
|
.setBlock(blk.getBlock())
|
||||||
|
.setBlockToken(new Token())
|
||||||
|
.createShortCircuitReplicaInfo());
|
||||||
|
} catch (NullPointerException ex) {
|
||||||
|
Assert.fail("Should not throw NPE when the native library is unable " +
|
||||||
|
"to create new files!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user