From 2091d1a4af6e59f49f6f6d75895e78d9148afed6 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 9 Jan 2019 20:02:36 -0800 Subject: [PATCH] HDDS-941. Rename ChunkGroupInputStream to keyInputStream and ChunkInputStream to BlockInputStream. Contributed by Shashikant Banerjee. --- ...InputStream.java => BlockInputStream.java} | 10 ++--- ...upInputStream.java => KeyInputStream.java} | 40 +++++++++---------- .../ozone/client/io/KeyOutputStream.java | 2 +- .../ozone/client/io/OzoneInputStream.java | 6 +-- .../hadoop/ozone/client/rpc/RpcClient.java | 4 +- .../storage/DistributedStorageHandler.java | 4 +- .../hadoop/ozone/om/TestChunkStreams.java | 18 ++++----- .../hadoop/fs/ozone/OzoneFSInputStream.java | 6 +-- .../ozone/s3/io/S3WrapperInputStream.java | 10 ++--- 9 files changed, 50 insertions(+), 50 deletions(-) rename hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/{ChunkInputStream.java => BlockInputStream.java} (97%) rename hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/{ChunkGroupInputStream.java => KeyInputStream.java} (90%) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java similarity index 97% rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 2e24acacfb..ddd01d3083 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -46,7 +46,7 @@ * instances. This class encapsulates all state management for iterating * through the sequence of chunks and the sequence of buffers within each chunk. */ -public class ChunkInputStream extends InputStream implements Seekable { +public class BlockInputStream extends InputStream implements Seekable { private static final int EOF = -1; @@ -61,7 +61,7 @@ public class ChunkInputStream extends InputStream implements Seekable { private int bufferIndex; /** - * Creates a new ChunkInputStream. + * Creates a new BlockInputStream. * * @param blockID block ID of the chunk * @param xceiverClientManager client manager that controls client @@ -69,7 +69,7 @@ public class ChunkInputStream extends InputStream implements Seekable { * @param chunks list of chunks to read * @param traceID container protocol call traceID */ - public ChunkInputStream( + public BlockInputStream( BlockID blockID, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, List chunks, String traceID) { this.blockID = blockID; @@ -79,7 +79,7 @@ public ChunkInputStream( this.chunks = chunks; this.chunkIndex = -1; // chunkOffset[i] stores offset at which chunk i stores data in - // ChunkInputStream + // BlockInputStream this.chunkOffset = new long[this.chunks.size()]; initializeChunkOffset(); this.buffers = null; @@ -154,7 +154,7 @@ public synchronized void close() { */ private synchronized void checkOpen() throws IOException { if (xceiverClient == null) { - throw new IOException("ChunkInputStream has been closed."); + throw new IOException("BlockInputStream has been closed."); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java similarity index 90% rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java rename to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 0cded71af6..99817fbbbc 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; @@ -44,17 +44,17 @@ import java.util.List; /** - * Maintaining a list of ChunkInputStream. Read based on offset. + * Maintaining a list of BlockInputStream. Read based on offset. */ -public class ChunkGroupInputStream extends InputStream implements Seekable { +public class KeyInputStream extends InputStream implements Seekable { private static final Logger LOG = - LoggerFactory.getLogger(ChunkGroupInputStream.class); + LoggerFactory.getLogger(KeyInputStream.class); private static final int EOF = -1; private final ArrayList streamEntries; - // streamOffset[i] stores the offset at which chunkInputStream i stores + // streamOffset[i] stores the offset at which blockInputStream i stores // data in the key private long[] streamOffset = null; private int currentStreamIndex; @@ -62,7 +62,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable { private boolean closed = false; private String key; - public ChunkGroupInputStream() { + public KeyInputStream() { streamEntries = new ArrayList<>(); currentStreamIndex = 0; } @@ -84,7 +84,7 @@ public long getRemainingOfIndex(int index) throws IOException { * @param streamLength the max number of bytes that should be written to this * stream. */ - public synchronized void addStream(ChunkInputStream stream, + public synchronized void addStream(BlockInputStream stream, long streamLength) { streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); } @@ -129,7 +129,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { // this case. throw new IOException(String.format( "Inconsistent read for blockID=%s length=%d numBytesRead=%d", - current.chunkInputStream.getBlockID(), current.length, + current.blockInputStream.getBlockID(), current.length, numBytesRead)); } totalReadLen += numBytesRead; @@ -174,7 +174,7 @@ public void seek(long pos) throws IOException { // accordingly so that currentStreamIndex = insertionPoint - 1 currentStreamIndex = -currentStreamIndex - 2; } - // seek to the proper offset in the ChunkInputStream + // seek to the proper offset in the BlockInputStream streamEntries.get(currentStreamIndex) .seek(pos - streamOffset[currentStreamIndex]); } @@ -207,17 +207,17 @@ public void close() throws IOException { } /** - * Encapsulates ChunkInputStream. + * Encapsulates BlockInputStream. */ public static class ChunkInputStreamEntry extends InputStream implements Seekable { - private final ChunkInputStream chunkInputStream; + private final BlockInputStream blockInputStream; private final long length; - public ChunkInputStreamEntry(ChunkInputStream chunkInputStream, + public ChunkInputStreamEntry(BlockInputStream blockInputStream, long length) { - this.chunkInputStream = chunkInputStream; + this.blockInputStream = blockInputStream; this.length = length; } @@ -228,29 +228,29 @@ synchronized long getRemaining() throws IOException { @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - int readLen = chunkInputStream.read(b, off, len); + int readLen = blockInputStream.read(b, off, len); return readLen; } @Override public synchronized int read() throws IOException { - int data = chunkInputStream.read(); + int data = blockInputStream.read(); return data; } @Override public synchronized void close() throws IOException { - chunkInputStream.close(); + blockInputStream.close(); } @Override public void seek(long pos) throws IOException { - chunkInputStream.seek(pos); + blockInputStream.seek(pos); } @Override public long getPos() throws IOException { - return chunkInputStream.getPos(); + return blockInputStream.getPos(); } @Override @@ -267,7 +267,7 @@ public static LengthInputStream getFromOmKeyInfo( String requestId) throws IOException { long length = 0; long containerKey; - ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); + KeyInputStream groupInputStream = new KeyInputStream(); groupInputStream.key = keyInfo.getKeyName(); List keyLocationInfos = keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); @@ -304,7 +304,7 @@ public static LengthInputStream getFromOmKeyInfo( length += chunk.getLen(); } success = true; - ChunkInputStream inputStream = new ChunkInputStream( + BlockInputStream inputStream = new BlockInputStream( omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, chunks, requestId); groupInputStream.addStream(inputStream, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 1c82ef4f51..66e419906d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException; /** - * Maintaining a list of ChunkInputStream. Write based on offset. + * Maintaining a list of BlockInputStream. Write based on offset. * * Note that this may write to multiple containers in one write call. In case * that first container succeeded but later ones failed, the succeeded writes diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java index e1f65e69a8..92eb150810 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java @@ -17,21 +17,21 @@ package org.apache.hadoop.ozone.client.io; -import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import java.io.IOException; import java.io.InputStream; /** * OzoneInputStream is used to read data from Ozone. - * It uses SCM's {@link ChunkInputStream} for reading the data. + * It uses {@link KeyInputStream} for reading the data. */ public class OzoneInputStream extends InputStream { private final InputStream inputStream; /** - * Constructs OzoneInputStream with ChunkInputStream. + * Constructs OzoneInputStream with KeyInputStream. * * @param inputStream */ diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index b6fb6b5d02..bd80b3ec00 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; -import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; +import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -541,7 +541,7 @@ public OzoneInputStream getKey( .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); LengthInputStream lengthInputStream = - ChunkGroupInputStream.getFromOmKeyInfo( + KeyInputStream.getFromOmKeyInfo( keyInfo, xceiverClientManager, storageContainerLocationClient, requestId); return new OzoneInputStream(lengthInputStream.getWrappedStream()); diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a078c01066..b73f297054 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.common.Checksum; @@ -38,7 +39,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts.Versioning; -import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; @@ -475,7 +475,7 @@ public LengthInputStream newKeyReader(KeyArgs args) throws IOException, .setDataSize(args.getSize()) .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); - return ChunkGroupInputStream.getFromOmKeyInfo( + return KeyInputStream.getFromOmKeyInfo( keyInfo, xceiverClientManager, storageContainerLocationClient, args.getRequestID()); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index 4ef46a362b..f3ab0934af 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -17,8 +17,8 @@ package org.apache.hadoop.ozone.om; import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; -import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -31,7 +31,7 @@ import static org.junit.Assert.assertEquals; /** - * This class tests ChunkGroupInputStream and KeyOutputStream. + * This class tests KeyInputStream and KeyOutputStream. */ public class TestChunkStreams { @@ -40,15 +40,15 @@ public class TestChunkStreams { @Test public void testReadGroupInputStream() throws Exception { - try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { + try (KeyInputStream groupInputStream = new KeyInputStream()) { String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(UTF_8); int offset = 0; for (int i = 0; i < 5; i++) { int tempOffset = offset; - ChunkInputStream in = - new ChunkInputStream(null, null, null, new ArrayList<>(), null) { + BlockInputStream in = + new BlockInputStream(null, null, null, new ArrayList<>(), null) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100); @@ -96,15 +96,15 @@ public synchronized int read(byte[] b, int off, int len) @Test public void testErrorReadGroupInputStream() throws Exception { - try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { + try (KeyInputStream groupInputStream = new KeyInputStream()) { String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(UTF_8); int offset = 0; for (int i = 0; i < 5; i++) { int tempOffset = offset; - ChunkInputStream in = - new ChunkInputStream(null, null, null, new ArrayList<>(), null) { + BlockInputStream in = + new BlockInputStream(null, null, null, new ArrayList<>(), null) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100); diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java index 4c5c0c8318..5df3cffa3d 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; +import org.apache.hadoop.ozone.client.io.KeyInputStream; import java.io.IOException; import java.io.InputStream; @@ -36,10 +36,10 @@ @InterfaceStability.Evolving public final class OzoneFSInputStream extends FSInputStream { - private final ChunkGroupInputStream inputStream; + private final KeyInputStream inputStream; public OzoneFSInputStream(InputStream inputStream) { - this.inputStream = (ChunkGroupInputStream)inputStream; + this.inputStream = (KeyInputStream)inputStream; } @Override diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java index 654962b162..9efcc8738c 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java @@ -19,24 +19,24 @@ package org.apache.hadoop.ozone.s3.io; import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; +import org.apache.hadoop.ozone.client.io.KeyInputStream; import java.io.IOException; import java.io.InputStream; /** - * S3Wrapper Input Stream which encapsulates ChunkGroupInputStream from ozone. + * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone. */ public class S3WrapperInputStream extends FSInputStream { - private final ChunkGroupInputStream inputStream; + private final KeyInputStream inputStream; /** - * Constructs S3WrapperInputStream with ChunkInputStream. + * Constructs S3WrapperInputStream with KeyInputStream. * * @param inputStream */ public S3WrapperInputStream(InputStream inputStream) { - this.inputStream = (ChunkGroupInputStream) inputStream; + this.inputStream = (KeyInputStream) inputStream; } @Override