diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 148726bd0e..1bbe91c625 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1768,6 +1768,9 @@ Release 0.22.0 - Unreleased HDFS-2491. TestBalancer can fail when datanode utilization and avgUtilization is exactly same. (Uma Maheswara Rao G via shv) + HDFS-2452. OutOfMemoryError in DataXceiverServer takes down the DataNode + (Uma Maheswara Rao via cos) + Release 0.21.1 - Unreleased HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index fdcdc18a34..2d3c49a3c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -92,7 +92,6 @@ public DataXceiver(Socket s, DataNode datanode, this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; - dataXceiverServer.childSockets.put(s, s); remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); @@ -129,6 +128,7 @@ private void updateCurrentThreadName(String status) { public void run() { int opsProcessed = 0; Op op = null; + dataXceiverServer.childSockets.put(s, s); try { int stdTimeout = s.getSoTimeout(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index f192747db5..c0d782a5c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -132,17 +131,12 @@ synchronized void release() { @Override public void run() { while (datanode.shouldRun) { + Socket s = null; try { - Socket s = ss.accept(); + s = ss.accept(); s.setTcpNoDelay(true); - final DataXceiver exciver; - try { - exciver = new DataXceiver(s, datanode, this); - } catch(IOException e) { - IOUtils.closeSocket(s); - throw e; - } - new Daemon(datanode.threadGroup, exciver).start(); + new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)) + .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run } catch (AsynchronousCloseException ace) { @@ -152,7 +146,19 @@ public void run() { LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { + IOUtils.closeSocket(s); LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie); + } catch (OutOfMemoryError ie) { + IOUtils.closeSocket(s); + // DataNode can run out of memory if there is too many transfers. + // Log the event, Sleep for 30 seconds, other transfers may complete by + // then. + LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) { + // ignore + } } catch (Throwable te) { LOG.error(datanode.getMachineName() + ":DataXceiverServer: Exiting due to: ", te); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataXceiverAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataXceiverAspects.aj new file mode 100644 index 0000000000..c762e32385 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataXceiverAspects.aj @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This aspect takes care about faults injected into datanode.DataXceiver + * class + */ +privileged public aspect DataXceiverAspects { + public static final Log LOG = LogFactory.getLog(DataXceiverAspects.class); + + pointcut runXceiverThread(DataXceiver xceiver) : + execution (* run(..)) && target(xceiver); + + void around (DataXceiver xceiver) : runXceiverThread(xceiver) { + if ("true".equals(System.getProperty("fi.enabledOOM"))) { + LOG.info("fi.enabledOOM is enabled"); + throw new OutOfMemoryError("Pretend there's no more memory"); + } else { + proceed(xceiver); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataXceiverServer.java new file mode 100644 index 0000000000..2f92fcf6ec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataXceiverServer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * This is a test for DataXceiverServer when DataXceiver thread spawning is + * failed due to OutOfMemoryError. Expected behavior is that DataXceiverServer + * should not be exited. It should retry again after 30 seconds + */ +public class TestFiDataXceiverServer { + + @Test(timeout = 30000) + public void testOutOfMemoryErrorInDataXceiverServerRun() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + ServerSocket sock = new ServerSocket() { + @Override + public Socket accept() throws IOException { + return new Socket() { + @Override + public InetAddress getInetAddress() { + return super.getLocalAddress(); + } + + @Override + public SocketAddress getRemoteSocketAddress() { + return new InetSocketAddress(8080); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return new InetSocketAddress(0); + } + + @Override + public synchronized void close() throws IOException { + latch.countDown(); + super.close(); + } + + @Override + public InputStream getInputStream() throws IOException { + return null; + } + }; + } + }; + Thread thread = null; + System.setProperty("fi.enabledOOM", "true"); + DataNode dn = Mockito.mock(DataNode.class); + try { + Configuration conf = new Configuration(); + Mockito.doReturn(conf).when(dn).getConf(); + dn.shouldRun = true; + DataXceiverServer server = new DataXceiverServer(sock, conf, dn); + thread = new Thread(server); + thread.start(); + latch.await(); + assertTrue("Not running the thread", thread.isAlive()); + } finally { + System.setProperty("fi.enabledOOM", "false"); + dn.shouldRun = false; + if (null != thread) + thread.interrupt(); + sock.close(); + } + } +} \ No newline at end of file