HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-04-23 18:59:52 -07:00
parent bcf89ddc7d
commit a0e0a63209
8 changed files with 113 additions and 18 deletions

View File

@ -522,6 +522,8 @@ Release 2.8.0 - UNRELEASED
split calculation (gera) split calculation (gera)
BUG FIXES BUG FIXES
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
is an I/O error during requestShortCircuitShm (cmccabe)
HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of
jclass to GetStaticObjectField. (Hui Zheng via cnauroth) jclass to GetStaticObjectField. (Hui Zheng via cnauroth)
@ -574,6 +576,9 @@ Release 2.7.1 - UNRELEASED
HADOOP-11730. Regression: s3n read failure recovery broken. HADOOP-11730. Regression: s3n read failure recovery broken.
(Takenori Sato via stevel) (Takenori Sato via stevel)
HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there
is an I/O error during requestShortCircuitShm (cmccabe)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -512,8 +512,8 @@ public void run() {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info(toString() + " terminating on InterruptedException"); LOG.info(toString() + " terminating on InterruptedException");
} catch (IOException e) { } catch (Throwable e) {
LOG.error(toString() + " terminating on IOException", e); LOG.error(toString() + " terminating on exception", e);
} finally { } finally {
lock.lock(); lock.lock();
try { try {

View File

@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd)
pollfd = &sd->pollfd[sd->used_size]; pollfd = &sd->pollfd[sd->used_size];
sd->used_size++; sd->used_size++;
pollfd->fd = fd; pollfd->fd = fd;
pollfd->events = POLLIN; pollfd->events = POLLIN | POLLHUP;
pollfd->revents = 0; pollfd->revents = 0;
} }
@ -162,7 +162,10 @@ JNIEnv *env, jobject obj)
GetLongField(env, obj, fd_set_data_fid); GetLongField(env, obj, fd_set_data_fid);
used_size = sd->used_size; used_size = sd->used_size;
for (i = 0; i < used_size; i++) { for (i = 0; i < used_size; i++) {
if (sd->pollfd[i].revents & POLLIN) { // We check for both POLLIN and POLLHUP, because on some OSes, when a socket
// is shutdown(), it sends POLLHUP rather than POLLIN.
if ((sd->pollfd[i].revents & POLLIN) ||
(sd->pollfd[i].revents & POLLHUP)) {
num_readable++; num_readable++;
} else { } else {
sd->pollfd[i].revents = 0; sd->pollfd[i].revents = 0;
@ -177,7 +180,8 @@ JNIEnv *env, jobject obj)
} }
j = 0; j = 0;
for (i = 0; ((i < used_size) && (j < num_readable)); i++) { for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
if (sd->pollfd[i].revents & POLLIN) { if ((sd->pollfd[i].revents & POLLIN) ||
(sd->pollfd[i].revents & POLLHUP)) {
carr[j] = sd->pollfd[i].fd; carr[j] = sd->pollfd[i].fd;
j++; j++;
sd->pollfd[i].revents = 0; sd->pollfd[i].revents = 0;

View File

@ -39,4 +39,6 @@ public static DataNodeFaultInjector get() {
public void getHdfsBlocksMetadata() {} public void getHdfsBlocksMetadata() {}
public void writeBlockAfterFlush() throws IOException {} public void writeBlockAfterFlush() throws IOException {}
public void sendShortCircuitShmResponse() throws IOException {}
} }

View File

@ -420,6 +420,7 @@ private void sendShmErrorResponse(Status status, String error)
private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo) private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
throws IOException { throws IOException {
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelper.convert(shmInfo.shmId)).build(). setId(PBHelper.convert(shmInfo.shmId)).build().
writeDelimitedTo(socketOut); writeDelimitedTo(socketOut);
@ -478,10 +479,19 @@ public void requestShortCircuitShm(String clientName) throws IOException {
} }
} }
if ((!success) && (peer == null)) { if ((!success) && (peer == null)) {
// If we failed to pass the shared memory segment to the client, // The socket is now managed by the DomainSocketWatcher. However,
// close the UNIX domain socket now. This will trigger the // we failed to pass it to the client. We call shutdown() on the
// DomainSocketWatcher callback, cleaning up the segment. // UNIX domain socket now. This will trigger the DomainSocketWatcher
IOUtils.cleanup(null, sock); // callback. The callback will close the domain socket.
// We don't want to close the socket here, since that might lead to
// bad behavior inside the poll() call. See HADOOP-11802 for details.
try {
LOG.warn("Failed to send success response back to the client. " +
"Shutting down socket for " + shmInfo.shmId + ".");
sock.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shut down socket in error handler", e);
}
} }
IOUtils.cleanup(null, shmInfo); IOUtils.cleanup(null, shmInfo);
} }

View File

@ -216,10 +216,11 @@ private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
* Must be called with the EndpointShmManager lock held. * Must be called with the EndpointShmManager lock held.
* *
* @param peer The peer to use to talk to the DataNode. * @param peer The peer to use to talk to the DataNode.
* @param clientName The client name.
* @param usedPeer (out param) Will be set to true if we used the peer. * @param usedPeer (out param) Will be set to true if we used the peer.
* When a peer is used * When a peer is used
* *
* @param clientName The client name.
* @param blockId The block ID to use.
* @return null if the DataNode does not support shared memory * @return null if the DataNode does not support shared memory
* segments, or experienced an error creating the * segments, or experienced an error creating the
* shm. The shared memory segment itself on success. * shm. The shared memory segment itself on success.

View File

@ -21,6 +21,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -185,4 +186,9 @@ public void disableShortCircuitForPath(String path) {
public void disableDomainSocketPath(String path) { public void disableDomainSocketPath(String path) {
pathMap.put(path, PathState.UNUSABLE); pathMap.put(path, PathState.UNUSABLE);
} }
@VisibleForTesting
public void clearPathMap() {
pathMap.invalidateAll();
}
} }

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@ -69,6 +70,9 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -619,6 +623,18 @@ public void visit(HashMap<DatanodeInfo,
sockDir.close(); sockDir.close();
} }
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
final int expectedSlots, ShortCircuitRegistry registry) {
registry.visit(new ShortCircuitRegistry.Visitor() {
@Override
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(expectedSegments, segments.size());
Assert.assertEquals(expectedSlots, slots.size());
}
});
}
public static class TestCleanupFailureInjector public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector { extends BlockReaderFactory.FailureInjector {
@Override @Override
@ -663,16 +679,67 @@ public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t); "testing, but we failed to do a non-TCP read.", t);
} }
ShortCircuitRegistry registry = checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry(); cluster.getDataNodes().get(0).getShortCircuitRegistry());
registry.visit(new ShortCircuitRegistry.Visitor() { cluster.shutdown();
@Override sockDir.close();
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(1, slots.size());
} }
});
// Regression test for HADOOP-11802
@Test(timeout=60000)
public void testDataXceiverHandlesRequestShortCircuitShmFailure()
throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
(short)1, 0xFADE1);
LOG.info("Setting failure injector and performing a read which " +
"should fail...");
DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("injected error into sendShmResponse");
}
}).when(failureInjector).sendShortCircuitShmResponse();
DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
DataNodeFaultInjector.instance = failureInjector;
try {
// The first read will try to allocate a shared memory segment and slot.
// The shared memory segment allocation will fail because of the failure
// injector.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
Assert.fail("expected readFileBuffer to fail, but it succeeded.");
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
checkNumberOfSegmentsAndSlots(0, 0,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
LOG.info("Clearing failure injector and performing another read...");
DataNodeFaultInjector.instance = prevInjector;
fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
// The second read should succeed.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// We should have added a new short-circuit shared memory segment and slot.
checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
} }