diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0c7f686f41..20881ea645 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -844,6 +844,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4804. WARN when users set the block balanced preference percent below 0.5 or above 1.0. (Stephen Chu via atm) + HDFS-4698. Provide client-side metrics for remote reads, local reads, and + short-circuit reads. (Colin Patrick McCabe via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c index aa3db3d4af..deb11ef304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_web.c @@ -1197,6 +1197,24 @@ int hdfsFileIsOpenForRead(hdfsFile file) return (file->type == INPUT); } +int hdfsFileGetReadStatistics(hdfsFile file, + struct hdfsReadStatistics **stats) +{ + errno = ENOTSUP; + return -1; +} + +int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats) +{ + return stats->totalBytesRead - stats->totalLocalBytesRead; +} + +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) +{ + free(stats); +} + int hdfsFileIsOpenForWrite(hdfsFile file) { return (file->type == OUTPUT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 5f3a507572..e1e40c0191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -70,4 +70,15 @@ public interface BlockReader extends ByteBufferReadable { * filled or the next call will return EOF. */ int readAll(byte[] buf, int offset, int len) throws IOException; + + /** + * @return true only if this is a local read. + */ + boolean isLocal(); + + /** + * @return true only if this is a short-circuit read. + * All short-circuit reads are also local. + */ + boolean isShortCircuit(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 5a55708937..86ac6cb357 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -531,4 +531,14 @@ public int available() throws IOException { // We never do network I/O in BlockReaderLocal. return Integer.MAX_VALUE; } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 1d5a334dc5..acee9de49a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -700,4 +700,14 @@ public int available() throws IOException { // We never do network I/O in BlockReaderLocalLegacy. return Integer.MAX_VALUE; } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a78b6c8303..56fc97bf04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -81,7 +81,74 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable private LocatedBlock currentLocatedBlock = null; private long pos = 0; private long blockEnd = -1; + private final ReadStatistics readStatistics = new ReadStatistics(); + public static class ReadStatistics { + public ReadStatistics() { + this.totalBytesRead = 0; + this.totalLocalBytesRead = 0; + this.totalShortCircuitBytesRead = 0; + } + + public ReadStatistics(ReadStatistics rhs) { + this.totalBytesRead = rhs.getTotalBytesRead(); + this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); + this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); + } + + /** + * @return The total bytes read. This will always be at least as + * high as the other numbers, since it includes all of them. + */ + public long getTotalBytesRead() { + return totalBytesRead; + } + + /** + * @return The total local bytes read. This will always be at least + * as high as totalShortCircuitBytesRead, since all short-circuit + * reads are also local. + */ + public long getTotalLocalBytesRead() { + return totalLocalBytesRead; + } + + /** + * @return The total short-circuit local bytes read. + */ + public long getTotalShortCircuitBytesRead() { + return totalShortCircuitBytesRead; + } + + /** + * @return The total number of bytes read which were not local. + */ + public long getRemoteBytesRead() { + return totalBytesRead - totalLocalBytesRead; + } + + void addRemoteBytes(long amt) { + this.totalBytesRead += amt; + } + + void addLocalBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + } + + void addShortCircuitBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + this.totalShortCircuitBytesRead += amt; + } + + private long totalBytesRead; + + private long totalLocalBytesRead; + + private long totalShortCircuitBytesRead; + } + private final FileInputStreamCache fileInputStreamCache; /** @@ -546,9 +613,25 @@ public synchronized int read() throws IOException { * strategy-agnostic. */ private interface ReaderStrategy { - public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException; + public int doRead(BlockReader blockReader, int off, int len, + ReadStatistics readStatistics) throws ChecksumException, IOException; } + private static void updateReadStatistics(ReadStatistics readStatistics, + int nRead, BlockReader blockReader) { + if (nRead <= 0) return; + if (blockReader.isShortCircuit()) { + readStatistics.totalBytesRead += nRead; + readStatistics.totalLocalBytesRead += nRead; + readStatistics.totalShortCircuitBytesRead += nRead; + } else if (blockReader.isLocal()) { + readStatistics.totalBytesRead += nRead; + readStatistics.totalLocalBytesRead += nRead; + } else { + readStatistics.totalBytesRead += nRead; + } + } + /** * Used to read bytes into a byte[] */ @@ -560,8 +643,11 @@ public ByteArrayStrategy(byte[] buf) { } @Override - public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { - return blockReader.read(buf, off, len); + public int doRead(BlockReader blockReader, int off, int len, + ReadStatistics readStatistics) throws ChecksumException, IOException { + int nRead = blockReader.read(buf, off, len); + updateReadStatistics(readStatistics, nRead, blockReader); + return nRead; } } @@ -575,13 +661,15 @@ private static class ByteBufferStrategy implements ReaderStrategy { } @Override - public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { + public int doRead(BlockReader blockReader, int off, int len, + ReadStatistics readStatistics) throws ChecksumException, IOException { int oldpos = buf.position(); int oldlimit = buf.limit(); boolean success = false; try { int ret = blockReader.read(buf); success = true; + updateReadStatistics(readStatistics, ret, blockReader); return ret; } finally { if (!success) { @@ -613,7 +701,7 @@ private synchronized int readBuffer(ReaderStrategy reader, int off, int len, while (true) { // retry as many times as seekToNewSource allows. try { - return reader.doRead(blockReader, off, len); + return reader.doRead(blockReader, off, len, readStatistics); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode @@ -1275,4 +1363,11 @@ static class DNAddrPair { this.addr = addr; } } + + /** + * Get statistics about the reads which this DFSInputStream has done. + */ + public synchronized ReadStatistics getReadStatistics() { + return new ReadStatistics(readStatistics); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index d391728c88..9476b80a37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -78,6 +79,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { * at the beginning so that the read can begin on a chunk boundary. */ private final long bytesNeededToFinish; + + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; private boolean eos = false; private boolean sentStatusCode = false; @@ -329,6 +335,9 @@ private RemoteBlockReader(String file, String bpid, long blockId, checksum.getChecksumSize() > 0? checksum : null, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); + + this.isLocal = DFSClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); this.peer = peer; this.datanodeID = datanodeID; @@ -477,4 +486,14 @@ public int available() throws IOException { // to us without doing network I/O. return DFSClient.TCP_WINDOW_SIZE; } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 01d83eaeeb..1ba6b55da8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -106,6 +107,11 @@ public class RemoteBlockReader2 implements BlockReader { */ private long bytesNeededToFinish; + /** + * True if we are reading from a local DataNode. + */ + private final boolean isLocal; + private final boolean verifyChecksum; private boolean sentStatusCode = false; @@ -255,6 +261,8 @@ protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { + this.isLocal = DFSClient.isLocalAddress(NetUtils. + createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; @@ -431,4 +439,14 @@ public int available() throws IOException { // to us without doing network I/O. return DFSClient.TCP_WINDOW_SIZE; } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isShortCircuit() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java index e07964a5a5..9ed895ed7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java @@ -68,4 +68,14 @@ public synchronized List getAllBlocks() throws IOException { public long getVisibleLength() throws IOException { return ((DFSInputStream) in).getFileLength(); } + + /** + * Get statistics about the reads which this DFSInputStream has done. + * Note that because HdfsDataInputStream is buffered, these stats may + * be higher than you would expect just by adding up the number of + * bytes read through HdfsDataInputStream. + */ + public synchronized DFSInputStream.ReadStatistics getReadStatistics() { + return ((DFSInputStream) in).getReadStatistics(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index f547415dcb..3b85820d5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -214,7 +214,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), - new DatanodeID(addr.getAddress().toString(), + new DatanodeID(addr.getAddress().getHostAddress(), addr.getHostName(), poolId, addr.getPort(), 0, 0), null, null, null, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index ba980a7a53..2782434769 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -81,6 +81,93 @@ int hdfsFileIsOpenForRead(hdfsFile file) return (file->type == INPUT); } +int hdfsFileGetReadStatistics(hdfsFile file, + struct hdfsReadStatistics **stats) +{ + jthrowable jthr; + jobject readStats = NULL; + jvalue jVal; + struct hdfsReadStatistics *s = NULL; + int ret; + JNIEnv* env = getJNIEnv(); + + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + if (file->type != INPUT) { + ret = EINVAL; + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, file->file, + "org/apache/hadoop/hdfs/client/HdfsDataInputStream", + "getReadStatistics", + "()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileGetReadStatistics: getReadStatistics failed"); + goto done; + } + readStats = jVal.l; + s = malloc(sizeof(struct hdfsReadStatistics)); + if (!s) { + ret = ENOMEM; + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, readStats, + "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", + "getTotalBytesRead", "()J"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileGetReadStatistics: getTotalBytesRead failed"); + goto done; + } + s->totalBytesRead = jVal.j; + + jthr = invokeMethod(env, &jVal, INSTANCE, readStats, + "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", + "getTotalLocalBytesRead", "()J"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileGetReadStatistics: getTotalLocalBytesRead failed"); + goto done; + } + s->totalLocalBytesRead = jVal.j; + + jthr = invokeMethod(env, &jVal, INSTANCE, readStats, + "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", + "getTotalShortCircuitBytesRead", "()J"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed"); + goto done; + } + s->totalShortCircuitBytesRead = jVal.j; + *stats = s; + s = NULL; + ret = 0; + +done: + destroyLocalReference(env, readStats); + free(s); + if (ret) { + errno = ret; + return -1; + } + return 0; +} + +int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats) +{ + return stats->totalBytesRead - stats->totalLocalBytesRead; +} + +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) +{ + free(stats); +} + int hdfsFileIsOpenForWrite(hdfsFile file) { return (file->type == OUTPUT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 7973e0a5e3..1871665955 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -81,6 +81,43 @@ extern "C" { */ int hdfsFileIsOpenForWrite(hdfsFile file); + struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + }; + + /** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ + int hdfsFileGetReadStatistics(hdfsFile file, + struct hdfsReadStatistics **stats); + + /** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ + int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats); + + /** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ + void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats); + /** * hdfsConnectAsUser - Connect to a hdfs file system as a specific user * Connect to the hdfs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c index c56c89300f..cf2bf96392 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c @@ -116,6 +116,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) hdfsFile file; int ret, expected; hdfsFileInfo *fileInfo; + struct hdfsReadStatistics *readStats = NULL; snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); @@ -157,6 +158,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0); EXPECT_NONNULL(file); + EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats)); + errno = 0; + EXPECT_ZERO(readStats->totalBytesRead); + EXPECT_ZERO(readStats->totalLocalBytesRead); + EXPECT_ZERO(readStats->totalShortCircuitBytesRead); + hdfsFileFreeReadStatistics(readStats); /* TODO: implement readFully and use it here */ ret = hdfsRead(fs, file, tmp, sizeof(tmp)); if (ret < 0) { @@ -169,6 +176,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) "it read %d\n", ret, expected); return EIO; } + EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats)); + errno = 0; + EXPECT_INT_EQ(expected, readStats->totalBytesRead); + hdfsFileFreeReadStatistics(readStats); EXPECT_ZERO(memcmp(prefix, tmp, expected)); EXPECT_ZERO(hdfsCloseFile(fs, file)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 704a5421a7..80cffa048e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -25,14 +25,19 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; public class TestBlockReaderLocal { @@ -339,11 +344,81 @@ public void doTest(BlockReaderLocal reader, byte original[]) } } } - + @Test public void testBlockReaderLocalReadCorrupt() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true); runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false); } + + @Test(timeout=60000) + public void TestStatisticsForShortCircuitLocalRead() throws Exception { + testStatistics(true); + } + + @Test(timeout=60000) + public void TestStatisticsForLocalRead() throws Exception { + testStatistics(false); + } + + private void testStatistics(boolean isShortCircuit) throws Exception { + Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); + HdfsConfiguration conf = new HdfsConfiguration(); + TemporarySocketDirectory sockDir = null; + if (isShortCircuit) { + DFSInputStream.tcpReadsDisabledForTesting = true; + sockDir = new TemporarySocketDirectory(); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock"). + getAbsolutePath()); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + DomainSocket.disableBindPathValidation(); + } else { + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); + } + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final long RANDOM_SEED = 4567L; + FSDataInputStream fsIn = null; + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalLocalBytesRead()); + if (isShortCircuit) { + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); + } else { + Assert.assertEquals(0, + dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); + } + fsIn.close(); + fsIn = null; + } finally { + DFSInputStream.tcpReadsDisabledForTesting = false; + if (fsIn != null) fsIn.close(); + if (cluster != null) cluster.shutdown(); + if (sockDir != null) sockDir.close(); + } + } }