diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 25505fa501..b6911dda42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -295,6 +295,8 @@ Release 2.7.0 - UNRELEASED HDFS-7165. Separate block metrics for files with replication count 1. (Zhe Zhang via wang) + HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 31ac80b5ed..478099deee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience; +import java.io.IOException; + /** * Used for injecting faults in DFSClient and DFSOutputStream tests. * Calls into this are a no-op in production code. @@ -35,4 +37,6 @@ public static DataNodeFaultInjector get() { } public void getHdfsBlocksMetadata() {} + + public void writeBlockAfterFlush() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 67eb941811..6fc819de68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -211,6 +211,7 @@ public void run() { LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { + datanode.metrics.incrDatanodeNetworkErrors(); throw err; } break; @@ -500,6 +501,7 @@ public void readBlock(final ExtendedBlock block, } catch (IOException ioe) { LOG.debug("Error reading client status response. Will close connection.", ioe); IOUtils.closeStream(out); + datanode.metrics.incrDatanodeNetworkErrors(); } } else { IOUtils.closeStream(out); @@ -520,6 +522,7 @@ public void readBlock(final ExtendedBlock block, */ LOG.warn(dnR + ":Got exception while serving " + block + " to " + remoteAddress, ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { IOUtils.closeStream(blockSender); @@ -657,6 +660,8 @@ public void writeBlock(final ExtendedBlock block, mirrorOut.flush(); + DataNodeFaultInjector.get().writeBlockAfterFlush(); + // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = @@ -695,6 +700,7 @@ public void writeBlock(final ExtendedBlock block, LOG.info(datanode + ":Exception transfering " + block + " to mirror " + mirrorNode + "- continuing without the mirror", e); + datanode.metrics.incrDatanodeNetworkErrors(); } } } @@ -749,6 +755,7 @@ public void writeBlock(final ExtendedBlock block, } catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { // close all opened streams @@ -782,6 +789,10 @@ public void transferBlock(final ExtendedBlock blk, datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); + } catch (IOException ioe) { + LOG.info("transferBlock " + blk + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); + throw ioe; } finally { IOUtils.closeStream(out); } @@ -873,6 +884,10 @@ public void blockChecksum(final ExtendedBlock block, .build() .writeDelimitedTo(out); out.flush(); + } catch (IOException ioe) { + LOG.info("blockChecksum " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); + throw ioe; } finally { IOUtils.closeStream(out); IOUtils.closeStream(checksumIn); @@ -938,6 +953,7 @@ public void copyBlock(final ExtendedBlock block, } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); + datanode.metrics.incrDatanodeNetworkErrors(); throw ioe; } finally { dataXceiverServer.balanceThrottler.release(); @@ -995,6 +1011,7 @@ public void replaceBlock(final ExtendedBlock block, BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + boolean IoeDuringCopyBlockOperation = false; try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1022,7 +1039,9 @@ public void replaceBlock(final ExtendedBlock block, HdfsConstants.IO_FILE_BUFFER_SIZE)); /* send request to the proxy */ + IoeDuringCopyBlockOperation = true; new Sender(proxyOut).copyBlock(block, blockToken); + IoeDuringCopyBlockOperation = false; // receive the response from the proxy @@ -1065,6 +1084,10 @@ public void replaceBlock(final ExtendedBlock block, opStatus = ERROR; errMsg = "opReplaceBlock " + block + " received exception " + ioe; LOG.info(errMsg); + if (!IoeDuringCopyBlockOperation) { + // Don't double count IO errors + datanode.metrics.incrDatanodeNetworkErrors(); + } throw ioe; } finally { // receive the last byte that indicates the proxy released its thread resource @@ -1083,6 +1106,7 @@ public void replaceBlock(final ExtendedBlock block, sendResponse(opStatus, errMsg); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); + datanode.metrics.incrDatanodeNetworkErrors(); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 57f12db3b5..09ad3da642 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -89,6 +89,9 @@ public class DataNodeMetrics { @Metric MutableCounterLong volumeFailures; + @Metric("Count of network errors on the datanode") + MutableCounterLong datanodeNetworkErrors; + @Metric MutableRate readBlockOp; @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @@ -296,6 +299,10 @@ public void incrVolumeFailures() { volumeFailures.incr(); } + public void incrDatanodeNetworkErrors() { + datanodeNetworkErrors.incr(); + } + /** Increment for getBlockLocalPathInfo calls */ public void incrBlocksGetLocalPathInfo() { blocksGetLocalPathInfo.incr(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 9b90d41198..90112af245 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -25,8 +25,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.Closeable; +import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -38,10 +43,13 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.junit.Test; +import org.mockito.Mockito; public class TestDataNodeMetrics { + private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class); @Test public void testDataNodeMetrics() throws Exception { @@ -186,4 +194,38 @@ public void testRoundTripAckMetric() throws Exception { } } } + + @Test(timeout=60000) + public void testTimeoutMetric() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final Path path = new Path("/test"); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + + final List streams = Lists.newArrayList(); + try { + final FSDataOutputStream out = + cluster.getFileSystem().create(path, (short) 2); + final DataNodeFaultInjector injector = Mockito.mock + (DataNodeFaultInjector.class); + Mockito.doThrow(new IOException("mock IOException")). + when(injector). + writeBlockAfterFlush(); + DataNodeFaultInjector.instance = injector; + streams.add(out); + out.writeBytes("old gs data\n"); + out.hflush(); + + final MetricsRecordBuilder dnMetrics = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + assertCounter("DatanodeNetworkErrors", 1L, dnMetrics); + } finally { + IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.instance = new DataNodeFaultInjector(); + } + } }