diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java index ea52c76e16..911486f1a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -39,6 +39,8 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numReadCacheHits; @Metric private MutableCounterLong numReadCacheMiss; @Metric private MutableCounterLong numReadLostBlocks; + @Metric private MutableCounterLong numDirectBlockWrites; + @Metric private MutableCounterLong numFailedDirectBlockWrites; @Metric private MutableRate dbReadLatency; @Metric private MutableRate containerReadLatency; @@ -46,6 +48,8 @@ public class CBlockTargetMetrics { @Metric private MutableRate dbWriteLatency; @Metric private MutableRate containerWriteLatency; + @Metric private MutableRate directBlockWriteLatency; + public CBlockTargetMetrics() { } @@ -76,6 +80,14 @@ public void incNumReadLostBlocks() { numReadLostBlocks.incr(); } + public void incNumDirectBlockWrites() { + numDirectBlockWrites.incr(); + } + + public void incNumFailedDirectBlockWrites() { + numFailedDirectBlockWrites.incr(); + } + public void updateDBReadLatency(long latency) { dbReadLatency.add(latency); } @@ -92,6 +104,10 @@ public void updateContainerWriteLatency(long latency) { containerWriteLatency.add(latency); } + public void updateDirectBlockWriteLatency(long latency) { + directBlockWriteLatency.add(latency); + } + @VisibleForTesting public long getNumReadOps() { return numReadOps.value(); @@ -116,4 +132,14 @@ public long getNumReadCacheMiss() { public long getNumReadLostBlocks() { return numReadLostBlocks.value(); } + + @VisibleForTesting + public long getNumDirectBlockWrites() { + return numDirectBlockWrites.value(); + } + + @VisibleForTesting + public long getNumFailedDirectBlockWrites() { + return numFailedDirectBlockWrites.value(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 8c3b3ff538..1aee0a796a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -23,6 +23,9 @@ import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.LevelDBStore; import org.slf4j.Logger; @@ -167,10 +170,33 @@ public void writeBlock(LogicalBlock block) throws IOException { } block.clearData(); } else { - // TODO : Support Direct I/O - LOG.error("Non-Cache I/O is not supported at this point of time."); - throw new IllegalStateException("Cache is required and cannot be " + - "disabled now."); + Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); + String containerName = pipeline.getContainerName(); + try { + long startTime = Time.monotonicNow(); + XceiverClientSpi client = parentCache.getClientManager() + .acquireClient(parentCache.getPipeline(block.getBlockID())); + // BUG: fix the trace ID. + ContainerProtocolCalls.writeSmallFile(client, containerName, + Long.toString(block.getBlockID()), block.getData().array(), ""); + long endTime = Time.monotonicNow(); + if (parentCache.isTraceEnabled()) { + String datahash = DigestUtils.sha256Hex(block.getData().array()); + parentCache.getTracer().info( + "Task=DirectWriterPut,BlockID={},Time={},SHA={}", + block.getBlockID(), endTime - startTime, datahash); + } + parentCache.getTargetMetrics(). + updateDirectBlockWriteLatency(endTime - startTime); + parentCache.getTargetMetrics().incNumDirectBlockWrites(); + } catch (Exception ex) { + parentCache.getTargetMetrics().incNumFailedDirectBlockWrites(); + LOG.error("Direct I/O writing of block:{} to container {} failed", + block.getBlockID(), containerName, ex); + throw ex; + } finally { + block.clearData(); + } } if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) { long startTime = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index c8d2aadc11..02aef137cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -428,4 +428,63 @@ public void testStorageImplNoLocalCache() throws IOException, 100, 20 * 1000); ozoneStore.close(); } + + /** + * This test creates a cache and performs a simple write / read. + * The operations are done by bypassing the cache. + * + * @throws IOException + */ + @Test + public void testDirectIO() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration cConfig = new OzoneConfiguration(); + cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false); + cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + final long blockID = 0; + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + String dataHash = DigestUtils.sha256Hex(data); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(cConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + Assert.assertFalse(cache.isShortCircuitIOEnabled()); + cache.put(blockID, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(1, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + // Please note that this read is directly from remote container + LogicalBlock block = cache.get(blockID); + Assert.assertEquals(1, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(1, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + + cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(2, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites()); + // Please note that this read is directly from remote container + block = cache.get(blockID + 1); + Assert.assertEquals(2, metrics.getNumReadOps()); + Assert.assertEquals(0, metrics.getNumReadCacheHits()); + Assert.assertEquals(2, metrics.getNumReadCacheMiss()); + Assert.assertEquals(0, metrics.getNumReadLostBlocks()); + String readHash = DigestUtils.sha256Hex(block.getData().array()); + Assert.assertEquals("File content does not match.", dataHash, readHash); + GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); + cache.close(); + } }