diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index fa17b7f1c6..0e3bc47cc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; @@ -27,18 +29,21 @@ import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** * Maintaining a list of ChunkInputStream. Read based on offset. */ -public class ChunkGroupInputStream extends InputStream { +public class ChunkGroupInputStream extends InputStream implements Seekable { private static final Logger LOG = LoggerFactory.getLogger(ChunkGroupInputStream.class); @@ -46,7 +51,13 @@ public class ChunkGroupInputStream extends InputStream { private static final int EOF = -1; private final ArrayList streamEntries; + // streamOffset[i] stores the offset at which chunkInputStream i stores + // data in the key + private long[] streamOffset = null; private int currentStreamIndex; + private long length = 0; + private boolean closed = false; + private String key; public ChunkGroupInputStream() { streamEntries = new ArrayList<>(); @@ -66,19 +77,21 @@ public long getRemainingOfIndex(int index) { /** * Append another stream to the end of the list. * - * @param stream the stream instance. - * @param length the max number of bytes that should be written to this - * stream. + * @param stream the stream instance. + * @param streamLength the max number of bytes that should be written to this + * stream. */ - public synchronized void addStream(InputStream stream, long length) { - streamEntries.add(new ChunkInputStreamEntry(stream, length)); + public synchronized void addStream(ChunkInputStream stream, + long streamLength) { + streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); } @Override public synchronized int read() throws IOException { + checkNotClosed(); if (streamEntries.size() <= currentStreamIndex) { - throw new IndexOutOfBoundsException(); + return EOF; } ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex); int data = entry.read(); @@ -87,6 +100,7 @@ public synchronized int read() throws IOException { @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + checkNotClosed(); if (b == null) { throw new NullPointerException(); } @@ -122,15 +136,82 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { return totalReadLen; } - private static class ChunkInputStreamEntry extends InputStream { + @Override + public void seek(long pos) throws IOException { + checkNotClosed(); + if (pos < 0 || pos >= length) { + if (pos == 0) { + // It is possible for length and pos to be zero in which case + // seek should return instead of throwing exception + return; + } + throw new EOFException( + "EOF encountered at pos: " + pos + " for key: " + key); + } + Preconditions.assertTrue(currentStreamIndex >= 0); + if (currentStreamIndex >= streamEntries.size()) { + currentStreamIndex = Arrays.binarySearch(streamOffset, pos); + } else if (pos < streamOffset[currentStreamIndex]) { + currentStreamIndex = + Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos); + } else if (pos >= streamOffset[currentStreamIndex] + streamEntries + .get(currentStreamIndex).length) { + currentStreamIndex = Arrays + .binarySearch(streamOffset, currentStreamIndex + 1, + streamEntries.size(), pos); + } + if (currentStreamIndex < 0) { + // Binary search returns -insertionPoint - 1 if element is not present + // in the array. insertionPoint is the point at which element would be + // inserted in the sorted array. We need to adjust the currentStreamIndex + // accordingly so that currentStreamIndex = insertionPoint - 1 + currentStreamIndex = -currentStreamIndex - 2; + } + // seek to the proper offset in the ChunkInputStream + streamEntries.get(currentStreamIndex) + .seek(pos - streamOffset[currentStreamIndex]); + } - private final InputStream inputStream; + @Override + public long getPos() throws IOException { + return length == 0 ? 0 : + streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex) + .getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + checkNotClosed(); + long remaining = length - getPos(); + return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; + } + + @Override + public void close() throws IOException { + closed = true; + for (int i = 0; i < streamEntries.size(); i++) { + streamEntries.get(i).close(); + } + } + + /** + * Encapsulates ChunkInputStream. + */ + public static class ChunkInputStreamEntry extends InputStream + implements Seekable { + + private final ChunkInputStream chunkInputStream; private final long length; private long currentPosition; - - ChunkInputStreamEntry(InputStream chunkInputStream, long length) { - this.inputStream = chunkInputStream; + public ChunkInputStreamEntry(ChunkInputStream chunkInputStream, + long length) { + this.chunkInputStream = chunkInputStream; this.length = length; this.currentPosition = 0; } @@ -142,21 +223,36 @@ synchronized long getRemaining() { @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - int readLen = inputStream.read(b, off, len); + int readLen = chunkInputStream.read(b, off, len); currentPosition += readLen; return readLen; } @Override public synchronized int read() throws IOException { - int data = inputStream.read(); + int data = chunkInputStream.read(); currentPosition += 1; return data; } @Override public synchronized void close() throws IOException { - inputStream.close(); + chunkInputStream.close(); + } + + @Override + public void seek(long pos) throws IOException { + chunkInputStream.seek(pos); + } + + @Override + public long getPos() throws IOException { + return chunkInputStream.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; } } @@ -168,8 +264,12 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo, long length = 0; String containerKey; ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); - for (KsmKeyLocationInfo ksmKeyLocationInfo : - keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) { + groupInputStream.key = keyInfo.getKeyName(); + List keyLocationInfos = + keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); + groupInputStream.streamOffset = new long[keyLocationInfos.size()]; + for (int i = 0; i < keyLocationInfos.size(); i++) { + KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i); String containerName = ksmKeyLocationInfo.getContainerName(); Pipeline pipeline = storageContainerLocationClient.getContainer(containerName); @@ -180,6 +280,7 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo, try { LOG.debug("get key accessing {} {}", xceiverClient.getPipeline().getContainerName(), containerKey); + groupInputStream.streamOffset[i] = length; ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation .containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); @@ -202,6 +303,19 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo, } } } + groupInputStream.length = length; return new LengthInputStream(groupInputStream, length); } + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException( + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 80e0c785f3..05a632e2f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; @@ -72,6 +73,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final XceiverClientManager xceiverClientManager; private final int chunkSize; private final String requestID; + private boolean closed; /** * A constructor for testing purpose only. @@ -86,6 +88,7 @@ public ChunkGroupOutputStream() { xceiverClientManager = null; chunkSize = 0; requestID = null; + closed = false; } /** @@ -196,6 +199,8 @@ public long getByteOffset() { @Override public synchronized void write(int b) throws IOException { + checkNotClosed(); + if (streamEntries.size() <= currentStreamIndex) { Preconditions.checkNotNull(ksmClient); // allocate a new block, if a exception happens, log an error and @@ -230,6 +235,8 @@ public synchronized void write(int b) throws IOException { @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + checkNotClosed(); + if (b == null) { throw new NullPointerException(); } @@ -286,6 +293,7 @@ private void allocateNewBlock(int index) throws IOException { @Override public synchronized void flush() throws IOException { + checkNotClosed(); for (int i = 0; i <= currentStreamIndex; i++) { streamEntries.get(i).flush(); } @@ -298,6 +306,10 @@ public synchronized void flush() throws IOException { */ @Override public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; for (ChunkOutputStreamEntry entry : streamEntries) { if (entry != null) { entry.close(); @@ -464,4 +476,17 @@ public void close() throws IOException { } } } + + /** + * Verify that the output stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException( + ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs + .getKeyName()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java index 7910d03aaa..3857bd0336 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java @@ -49,4 +49,12 @@ public synchronized void close() throws IOException { inputStream.close(); } + @Override + public int available() throws IOException { + return inputStream.available(); + } + + public InputStream getInputStream() { + return inputStream; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index 1e535c09c8..5369220a43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -57,4 +57,8 @@ public synchronized void close() throws IOException { //commitKey can be done here, if needed. outputStream.close(); } + + public OutputStream getOutputStream() { + return outputStream; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java index 26cef65634..943be2ab4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java @@ -18,13 +18,16 @@ package org.apache.hadoop.scm.storage; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.scm.XceiverClientSpi; @@ -38,7 +41,7 @@ * instances. This class encapsulates all state management for iterating * through the sequence of chunks and the sequence of buffers within each chunk. */ -public class ChunkInputStream extends InputStream { +public class ChunkInputStream extends InputStream implements Seekable { private static final int EOF = -1; @@ -47,9 +50,10 @@ public class ChunkInputStream extends InputStream { private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private List chunks; - private int chunkOffset; + private int chunkIndex; + private long[] chunkOffset; private List buffers; - private int bufferOffset; + private int bufferIndex; /** * Creates a new ChunkInputStream. @@ -67,9 +71,21 @@ public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; this.chunks = chunks; - this.chunkOffset = 0; + this.chunkIndex = -1; + // chunkOffset[i] stores offset at which chunk i stores data in + // ChunkInputStream + this.chunkOffset = new long[this.chunks.size()]; + initializeChunkOffset(); this.buffers = null; - this.bufferOffset = 0; + this.bufferIndex = 0; + } + + private void initializeChunkOffset() { + int tempOffset = 0; + for (int i = 0; i < chunks.size(); i++) { + chunkOffset[i] = tempOffset; + tempOffset += chunks.get(i).getLen(); + } } @Override @@ -77,7 +93,8 @@ public synchronized int read() throws IOException { checkOpen(); int available = prepareRead(1); - return available == EOF ? EOF : buffers.get(bufferOffset).get(); + return available == EOF ? EOF : + Byte.toUnsignedInt(buffers.get(bufferIndex).get()); } @Override @@ -106,7 +123,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { if (available == EOF) { return EOF; } - buffers.get(bufferOffset).get(b, off, available); + buffers.get(bufferIndex).get(b, off, available); return available; } @@ -144,20 +161,20 @@ private synchronized int prepareRead(int len) throws IOException { return EOF; } else if (buffers == null) { // The first read triggers fetching the first chunk. - readChunkFromContainer(0); + readChunkFromContainer(); } else if (!buffers.isEmpty() && - buffers.get(bufferOffset).hasRemaining()) { + buffers.get(bufferIndex).hasRemaining()) { // Data is available from the current buffer. - ByteBuffer bb = buffers.get(bufferOffset); + ByteBuffer bb = buffers.get(bufferIndex); return len > bb.remaining() ? bb.remaining() : len; } else if (!buffers.isEmpty() && - !buffers.get(bufferOffset).hasRemaining() && - bufferOffset < buffers.size() - 1) { + !buffers.get(bufferIndex).hasRemaining() && + bufferIndex < buffers.size() - 1) { // There are additional buffers available. - ++bufferOffset; - } else if (chunkOffset < chunks.size() - 1) { + ++bufferIndex; + } else if (chunkIndex < chunks.size() - 1) { // There are additional chunks available. - readChunkFromContainer(chunkOffset + 1); + readChunkFromContainer(); } else { // All available input has been consumed. return EOF; @@ -170,20 +187,75 @@ private synchronized int prepareRead(int len) throws IOException { * successful, then the data of the read chunk is saved so that its bytes can * be returned from subsequent read calls. * - * @param readChunkOffset offset in the chunk list of which chunk to read * @throws IOException if there is an I/O error while performing the call */ - private synchronized void readChunkFromContainer(int readChunkOffset) - throws IOException { + private synchronized void readChunkFromContainer() throws IOException { + // On every chunk read chunkIndex should be increased so as to read the + // next chunk + chunkIndex += 1; final ReadChunkResponseProto readChunkResponse; try { readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - chunks.get(readChunkOffset), key, traceID); + chunks.get(chunkIndex), key, traceID); } catch (IOException e) { throw new IOException("Unexpected OzoneException: " + e.toString(), e); } - chunkOffset = readChunkOffset; ByteString byteString = readChunkResponse.getData(); buffers = byteString.asReadOnlyByteBufferList(); + bufferIndex = 0; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0 || (chunks.size() == 0 && pos > 0) + || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) + .getLen()) { + throw new EOFException( + "EOF encountered pos: " + pos + " container key: " + key); + } + if (chunkIndex == -1) { + chunkIndex = Arrays.binarySearch(chunkOffset, pos); + } else if (pos < chunkOffset[chunkIndex]) { + chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); + } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) + .getLen()) { + chunkIndex = + Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); + } + if (chunkIndex < 0) { + // Binary search returns -insertionPoint - 1 if element is not present + // in the array. insertionPoint is the point at which element would be + // inserted in the sorted array. We need to adjust the chunkIndex + // accordingly so that chunkIndex = insertionPoint - 1 + chunkIndex = -chunkIndex -2; + } + // adjust chunkIndex so that readChunkFromContainer reads the correct chunk + chunkIndex -= 1; + readChunkFromContainer(); + adjustBufferIndex(pos); + } + + private void adjustBufferIndex(long pos) { + long tempOffest = chunkOffset[chunkIndex]; + for (int i = 0; i < buffers.size(); i++) { + if (pos - tempOffest >= buffers.get(i).capacity()) { + tempOffest += buffers.get(i).capacity(); + } else { + bufferIndex = i; + break; + } + } + buffers.get(bufferIndex).position((int) (pos - tempOffest)); + } + + @Override + public synchronized long getPos() throws IOException { + return chunkIndex == -1 ? 0 : + chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java index a6acc359fb..57c2002f0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java @@ -206,6 +206,10 @@ public StorageContainerManager getStorageContainerManager() { return this.scm; } + public OzoneConfiguration getConf() { + return conf; + } + @Override public KeySpaceManager getKeySpaceManager() { return this.ksm; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java index c4a5c7286b..7f1b8ca0ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java @@ -19,14 +19,15 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.scm.storage.ChunkInputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.InputStream; import java.io.OutputStream; +import java.io.IOException; import java.util.ArrayList; import static org.junit.Assert.assertEquals; @@ -111,13 +112,44 @@ public void testErrorWriteGroupOutputStream() throws Exception { @Test public void testReadGroupInputStream() throws Exception { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { - ArrayList inputStreams = new ArrayList<>(); + ArrayList inputStreams = new ArrayList<>(); String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(); int offset = 0; for (int i = 0; i < 5; i++) { - ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100); + int tempOffset = offset; + ChunkInputStream in = + new ChunkInputStream(null, null, null, new ArrayList<>(), null) { + private ByteArrayInputStream in = + new ByteArrayInputStream(buf, tempOffset, 100); + + @Override + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long getPos() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean seekToNewSource(long targetPos) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + }; inputStreams.add(in); offset += 100; groupInputStream.addStream(in, 100); @@ -134,13 +166,44 @@ public void testReadGroupInputStream() throws Exception { @Test public void testErrorReadGroupInputStream() throws Exception { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { - ArrayList inputStreams = new ArrayList<>(); + ArrayList inputStreams = new ArrayList<>(); String dataString = RandomStringUtils.randomAscii(500); byte[] buf = dataString.getBytes(); int offset = 0; for (int i = 0; i < 5; i++) { - ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100); + int tempOffset = offset; + ChunkInputStream in = + new ChunkInputStream(null, null, null, new ArrayList<>(), null) { + private ByteArrayInputStream in = + new ByteArrayInputStream(buf, tempOffset, 100); + + @Override + public void seek(long pos) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long getPos() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean seekToNewSource(long targetPos) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + }; inputStreams.add(in); offset += 100; groupInputStream.addStream(in, 100); diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java new file mode 100644 index 0000000000..3a0a8fe2ff --- /dev/null +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * The input stream for Ozone file system. + * + * TODO: Make inputStream generic for both rest and rpc clients + * This class is not thread safe. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class OzoneFSInputStream extends FSInputStream { + + private final ChunkGroupInputStream inputStream; + + public OzoneFSInputStream(InputStream inputStream) { + this.inputStream = (ChunkGroupInputStream)inputStream; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public synchronized void close() throws IOException { + inputStream.close(); + } + + @Override + public void seek(long pos) throws IOException { + inputStream.seek(pos); + } + + @Override + public long getPos() throws IOException { + return inputStream.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + return inputStream.available(); + } +} diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java new file mode 100644 index 0000000000..faa3628715 --- /dev/null +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; + + +/** + * The output stream for Ozone file system. + * + * TODO: Make outputStream generic for both rest and rpc clients + * This class is not thread safe. + */ +public class OzoneFSOutputStream extends OutputStream { + + private final ChunkGroupOutputStream outputStream; + + public OzoneFSOutputStream(OutputStream outputStream) { + this.outputStream = (ChunkGroupOutputStream)outputStream; + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + } + + @Override + public synchronized void flush() throws IOException { + outputStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + outputStream.close(); + } +} diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index bf3c08313e..485f2c076a 100644 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -18,16 +18,15 @@ package org.apache.hadoop.fs.ozone; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.text.ParseException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Objects; +import java.util.Iterator; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -35,12 +34,18 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.hadoop.ozone.web.client.OzoneKey; -import org.apache.hadoop.ozone.web.client.OzoneRestClient; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.OzoneKey; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.ReplicationFactor; +import org.apache.hadoop.ozone.client.ReplicationType; import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,19 +55,16 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.ozone.web.client.OzoneBucket; -import org.apache.hadoop.ozone.web.client.OzoneVolume; -import org.apache.hadoop.ozone.client.rest.OzoneException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR; -import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER; import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE; -import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY; /** * The Ozone Filesystem implementation. @@ -78,11 +80,15 @@ public class OzoneFileSystem extends FileSystem { static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class); /** The Ozone client for connecting to Ozone server. */ - private OzoneRestClient ozone; + private OzoneClient ozoneClient; + private ObjectStore objectStore; + private OzoneVolume volume; private OzoneBucket bucket; private URI uri; private String userName; private Path workingDir; + private ReplicationType replicationType; + private ReplicationFactor replicationFactor; @Override public void initialize(URI name, Configuration conf) throws IOException { @@ -115,23 +121,24 @@ public void initialize(URI name, Configuration conf) throws IOException { .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER + bucketStr + OZONE_URI_DELIMITER).build(); LOG.trace("Ozone URI for ozfs initialization is " + uri); - this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr); + this.ozoneClient = OzoneClientFactory.getRpcClient(conf); + objectStore = ozoneClient.getObjectStore(); + this.volume = objectStore.getVolume(volumeStr); + this.bucket = volume.getBucket(bucketStr); + this.replicationType = ReplicationType.valueOf( + conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, + OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)); + this.replicationFactor = ReplicationFactor.valueOf( + conf.getInt(OzoneConfigKeys.OZONE_REPLICATION, + OzoneConfigKeys.OZONE_REPLICATION_DEFAULT)); try { this.userName = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException e) { this.userName = OZONE_DEFAULT_USER; } - this.ozone.setUserAuth(userName); - - OzoneVolume volume = ozone.getVolume(volumeStr); - this.bucket = volume.getBucket(bucketStr); this.workingDir = new Path(OZONE_USER_DIR, this.userName) .makeQualified(this.uri, this.workingDir); - } catch (OzoneException oe) { - final String msg = "Ozone server exception when initializing file system"; - LOG.error(msg, oe); - throw new IOException(msg, oe); } catch (URISyntaxException ue) { final String msg = "Invalid Ozone endpoint " + name; LOG.error(msg, ue); @@ -142,7 +149,7 @@ public void initialize(URI name, Configuration conf) throws IOException { @Override public void close() throws IOException { try { - ozone.close(); + ozoneClient.close(); } finally { super.close(); } @@ -162,14 +169,13 @@ public String getScheme() { public FSDataInputStream open(Path f, int bufferSize) throws IOException { LOG.trace("open() path:{}", f); final FileStatus fileStatus = getFileStatus(f); - + final String key = pathToKey(f); if (fileStatus.isDirectory()) { throw new FileNotFoundException("Can't open directory " + f + " to read"); } return new FSDataInputStream( - new OzoneInputStream(getConf(), uri, bucket, pathToKey(f), - fileStatus.getLen(), bufferSize, statistics)); + new OzoneFSInputStream(bucket.readKey(key).getInputStream())); } @Override @@ -206,11 +212,12 @@ public FSDataOutputStream create(Path f, FsPermission permission, // does not exists and a new file can thus be created. } - final OzoneOutputStream stream = - new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics); + OzoneOutputStream ozoneOutputStream = + bucket.createKey(key, 0, replicationType, replicationFactor); // We pass null to FSDataOutputStream so it won't count writes that // are being buffered to a file - return new FSDataOutputStream(stream, null); + return new FSDataOutputStream( + new OzoneFSOutputStream(ozoneOutputStream.getOutputStream()), null); } @Override @@ -245,7 +252,7 @@ private class RenameIterator extends OzoneListingIterator { RenameIterator(Path srcPath, Path dstPath) throws IOException { - super(srcPath, true); + super(srcPath); srcKey = pathToKey(srcPath); dstKey = pathToKey(dstPath); LOG.trace("rename from:{} to:{}", srcKey, dstKey); @@ -253,30 +260,17 @@ private class RenameIterator extends OzoneListingIterator { boolean processKey(String key) throws IOException { String newKeyName = dstKey.concat(key.substring(srcKey.length())); - return rename(key, newKeyName); + rename(key, newKeyName); + return true; } - // TODO: currently rename work by copying the file, with changes in KSM, - // this operation can be made improved by renaming the keys in KSM directly. - private boolean rename(String src, String dst) throws IOException { - final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - final File tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, getConf()); - - try { - LOG.trace("rename by copying file from:{} to:{}", src, dst); - bucket.getKey(src, tmpFile.toPath()); - bucket.putKey(dst, tmpFile); - return true; - } catch (OzoneException oe) { - String msg = String.format("Error when renaming key from:%s to:%s", - src, dst); - LOG.error(msg, oe); - throw new IOException(msg, oe); - } finally { - if (!tmpFile.delete()) { - LOG.warn("Can not delete tmpFile: " + tmpFile); - } + // TODO: currently rename work by copying the streams, with changes in KSM, + // this operation can be improved by renaming the keys in KSM directly. + private void rename(String src, String dst) throws IOException { + try (OzoneInputStream inputStream = bucket.readKey(src); + OzoneOutputStream outputStream = bucket + .createKey(dst, 0, replicationType, replicationFactor)) { + IOUtils.copyBytes(inputStream, outputStream, getConf()); } } } @@ -386,8 +380,13 @@ private class DeleteIterator extends OzoneListingIterator { private boolean recursive; DeleteIterator(Path f, boolean recursive) throws IOException { - super(f, recursive); + super(f); this.recursive = recursive; + if (getStatus().isDirectory() + && !this.recursive + && listStatus(f).length != 0) { + throw new PathIsNotEmptyDirectoryException(f.toString()); + } } boolean processKey(String key) throws IOException { @@ -421,7 +420,7 @@ private class ListStatusIterator extends OzoneListingIterator { private Path f; ListStatusIterator(Path f) throws IOException { - super(f, true); + super(f); this.f = f; } @@ -532,8 +531,7 @@ public FileStatus getFileStatus(Path f) throws IOException { if (key.length() == 0) { return new FileStatus(0, true, 1, 0, - getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER), - qualifiedPath); + bucket.getCreationTime(), qualifiedPath); } // consider this a file and get key status @@ -548,14 +546,11 @@ public FileStatus getFileStatus(Path f) throws IOException { throw new FileNotFoundException(f + ": No such file or directory!"); } else if (isDirectory(meta)) { return new FileStatus(0, true, 1, 0, - getModifiedTime(meta.getObjectInfo().getModifiedOn(), key), - qualifiedPath); + meta.getModificationTime(), qualifiedPath); } else { //TODO: Fetch replication count from ratis config - return new FileStatus(meta.getObjectInfo().getSize(), false, 1, - getDefaultBlockSize(f), - getModifiedTime(meta.getObjectInfo().getModifiedOn(), key), - qualifiedPath); + return new FileStatus(meta.getDataSize(), false, 1, + getDefaultBlockSize(f), meta.getModificationTime(), qualifiedPath); } } @@ -566,54 +561,23 @@ public FileStatus getFileStatus(Path f) throws IOException { */ private OzoneKey getKeyInfo(String key) { try { - return bucket.getKeyInfo(key); - } catch (OzoneException e) { + return bucket.getKey(key); + } catch (IOException e) { LOG.trace("Key:{} does not exists", key); return null; } } - /** - * Helper method to get the modified time of the key. - * @param key key to fetch the modified time - * @return last modified time of the key - */ - private long getModifiedTime(String modifiedTime, String key) { - try { - return OzoneUtils.formatDate(modifiedTime); - } catch (ParseException pe) { - LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe); - return 0; - } - } - /** * Helper method to check if an Ozone key is representing a directory. * @param key key to be checked as a directory * @return true if key is a directory, false otherwise */ private boolean isDirectory(OzoneKey key) { - LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(), - key.getObjectInfo().getSize()); - return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER) - && (key.getObjectInfo().getSize() == 0); - } - - /** - * Helper method to list entries matching the key name in bucket. - * @param dirKey key prefix for listing the keys - * @param lastKey last iterated key - * @return List of Keys - */ - List listKeys(String dirKey, String lastKey) - throws IOException { - LOG.trace("list keys dirKey:{} lastKey:{}", dirKey, lastKey); - try { - return bucket.listKeys(dirKey, LISTING_PAGE_SIZE, lastKey); - } catch (OzoneException oe) { - LOG.error("list keys failed dirKey:{} lastKey:{}", dirKey, lastKey, oe); - throw new IOException("List keys failed " + oe.getMessage()); - } + LOG.trace("key name:{} size:{}", key.getName(), + key.getDataSize()); + return key.getName().endsWith(OZONE_URI_DELIMITER) + && (key.getDataSize() == 0); } /** @@ -623,11 +587,11 @@ List listKeys(String dirKey, String lastKey) */ private boolean createDirectory(String keyName) { try { - LOG.trace("creating dir for key:{}", keyName); - bucket.putKey(keyName, ""); + LOG.info("creating dir for key:{}", keyName); + bucket.createKey(keyName, 0, replicationType, replicationFactor).close(); return true; - } catch (OzoneException oe) { - LOG.error("create key failed for key:{}", keyName, oe); + } catch (IOException ioe) { + LOG.error("create key failed for key:{}", keyName, ioe); return false; } } @@ -642,8 +606,8 @@ private boolean deleteObject(String keyName) { try { bucket.deleteKey(keyName); return true; - } catch (OzoneException oe) { - LOG.error("delete key failed " + oe.getMessage()); + } catch (IOException ioe) { + LOG.error("delete key failed " + ioe.getMessage()); return false; } } @@ -671,7 +635,7 @@ public String pathToKey(Path path) { * @param key the ozone Key which needs to be appended * @return delimiter appended key */ - String addTrailingSlashIfNeeded(String key) { + private String addTrailingSlashIfNeeded(String key) { if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) { return key + OZONE_URI_DELIMITER; } else { @@ -690,47 +654,36 @@ public String toString() { private abstract class OzoneListingIterator { private final Path path; - private final boolean recursive; private final FileStatus status; private String pathKey; + private Iterator keyIterator; - OzoneListingIterator(Path path, boolean recursive) + OzoneListingIterator(Path path) throws IOException { this.path = path; - this.recursive = recursive; this.status = getFileStatus(path); this.pathKey = pathToKey(path); if (status.isDirectory()) { this.pathKey = addTrailingSlashIfNeeded(pathKey); } + keyIterator = bucket.listKeys(pathKey); } abstract boolean processKey(String key) throws IOException; // iterates all the keys in the particular path boolean iterate() throws IOException { - LOG.trace("Iterating path {} - recursive {}", path, recursive); + LOG.trace("Iterating path {}", path); if (status.isDirectory()) { LOG.trace("Iterating directory:{}", pathKey); - String lastKey = pathKey; - while (true) { - List ozoneKeys = listKeys(pathKey, lastKey); - LOG.trace("number of sub keys:{}", ozoneKeys.size()); - if (ozoneKeys.size() == 0) { - return processKey(pathKey); - } else { - if (!recursive) { - throw new PathIsNotEmptyDirectoryException(path.toString()); - } else { - for (OzoneKey ozoneKey : ozoneKeys) { - lastKey = ozoneKey.getObjectInfo().getKeyName(); - if (!processKey(lastKey)) { - return false; - } - } - } + while (keyIterator.hasNext()) { + OzoneKey key = keyIterator.next(); + LOG.info("iterating key:{}", key.getName()); + if (!processKey(key.getName())) { + return false; } } + return true; } else { LOG.trace("iterating file:{}", path); return processKey(pathKey); @@ -744,5 +697,9 @@ String getPathKey() { boolean pathIsDirectory() { return status.isDirectory(); } + + FileStatus getStatus() { + return status; + } } } diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java deleted file mode 100644 index bd713585db..0000000000 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.ozone; - -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.net.URI; -import java.util.Objects; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.ozone.web.client.OzoneBucket; -import org.apache.hadoop.ozone.client.rest.OzoneException; - -import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY; -import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY; - -/** - * Wraps OzoneInputStream implementation. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public final class OzoneInputStream extends FSInputStream { - private static final Log LOG = LogFactory.getLog(OzoneInputStream.class); - - private final RandomAccessFile in; - - /** Closed bit. Volatile so reads are non-blocking. */ - private volatile boolean closed = false; - - /** the ozone bucket client. */ - private final OzoneBucket bucket; - - /** The object key. */ - private final String key; - - /** Object content length. */ - private final long contentLen; - - /** file system stats. */ - private final Statistics stats; - - private final URI keyUri; - - OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket, - String key, long contentLen, int bufferSize, Statistics statistics) - throws IOException { - Objects.requireNonNull(bucket, "bucket can not be null!"); - Objects.requireNonNull(key, "kenName can not be null!"); - this.bucket = bucket; - this.key = key; - this.contentLen = contentLen; - this.stats = statistics; - this.keyUri = fsUri.resolve(key); - - if (conf.get(BUFFER_DIR_KEY) == null) { - conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone"); - } - final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - final File tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, conf); - try { - LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath()); - bucket.getKey(this.key, tmpFile.toPath()); - in = new RandomAccessFile(tmpFile, "r"); - statistics.incrementReadOps(1); - } catch (OzoneException oe) { - final String msg = "Error when getBytes for key = " + key; - LOG.error(msg, oe); - throw new IOException(msg, oe); - } - } - - @Override - public synchronized void seek(long targetPos) throws IOException { - checkNotClosed(); - // Do not allow negative seek - if (targetPos < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos); - } - - if (this.contentLen <= 0) { - return; - } - - in.seek(targetPos); - } - - @Override - public synchronized long getPos() throws IOException { - checkNotClosed(); - return in.getFilePointer(); - } - - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - - @Override - public synchronized int read() throws IOException { - int ch = in.read(); - if (stats != null && ch != -1) { - stats.incrementBytesRead(1); - } - return ch; - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) - throws IOException { - Preconditions.checkArgument(buffer != null, "buffer can not be null"); - int numberOfByteRead = super.read(position, buffer, offset, length); - - if (stats != null && numberOfByteRead > 0) { - stats.incrementBytesRead(numberOfByteRead); - } - return numberOfByteRead; - } - - @Override - public synchronized int read(byte[] buffer, int offset, int length) - throws IOException { - Preconditions.checkArgument(buffer != null, "buffer can not be null"); - int numberOfByteRead = in.read(buffer, offset, length); - if (stats != null && numberOfByteRead > 0) { - stats.incrementBytesRead(numberOfByteRead); - } - return numberOfByteRead; - } - - @Override - public synchronized int available() throws IOException { - checkNotClosed(); - - final long remainingInWrapped = contentLen - in.getFilePointer(); - return (remainingInWrapped < Integer.MAX_VALUE) - ? (int)remainingInWrapped - : Integer.MAX_VALUE; - } - - @Override - public synchronized void close() throws IOException { - in.close(); - } - - @Override - public synchronized long skip(long pos) throws IOException { - return in.skipBytes((int) pos); - } - - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException(this.keyUri + ": " - + FSExceptionMessages.STREAM_IS_CLOSED); - } - } - -} diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java deleted file mode 100644 index 3b5df3b255..0000000000 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.ozone; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.ozone.web.client.OzoneBucket; -import org.apache.hadoop.ozone.client.rest.OzoneException; - -import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY; -import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY; - - -/** - * The output stream for Ozone file system. - * - * Data will be buffered on local disk, then uploaded to Ozone in - * {@link #close()} method. - * - * This class is not thread safe. - */ -public class OzoneOutputStream extends OutputStream { - private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class); - private OzoneBucket bucket; - private final String key; - private final URI keyUri; - private Statistics statistics; - private LocalDirAllocator dirAlloc; - private boolean closed; - private File tmpFile; - private BufferedOutputStream backupStream; - - OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket, - String key, Statistics statistics) throws IOException { - this.bucket = bucket; - this.key = key; - this.keyUri = fsUri.resolve(key); - this.statistics = statistics; - - if (conf.get(BUFFER_DIR_KEY) == null) { - conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone"); - } - dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, conf); - backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); - - closed = false; - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (backupStream != null) { - backupStream.close(); - } - try { - LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri); - bucket.putKey(key, tmpFile); - statistics.incrementWriteOps(1); - } catch (OzoneException oe) { - final String msg = "Uploading error: file=" + tmpFile + ", key=" + key; - LOG.error(msg, oe); - throw new IOException(msg, oe); - } finally { - if (!tmpFile.delete()) { - LOG.warn("Can not delete tmpFile: " + tmpFile); - } - } - } - - @Override - public synchronized void flush() throws IOException { - backupStream.flush(); - } - - @Override - public synchronized void write(int b) throws IOException { - backupStream.write(b); - statistics.incrementBytesWritten(1); - } - -} diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java index b905ea1a82..9f8137e050 100644 --- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java +++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java @@ -114,7 +114,9 @@ public void testOzFsReadWrite() throws IOException { } FileStatus status = fs.getFileStatus(path); - Assert.assertTrue(status.getModificationTime() < currentTime); + // The timestamp of the newly created file should always be greater than + // the time when the test was started + Assert.assertTrue(status.getModificationTime() > currentTime); try (FSDataInputStream inputStream = fs.open(path)) { byte[] buffer = new byte[stringLen]; diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java index 820c5c0085..5ac2163d2e 100644 --- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java +++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java @@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.ozone.ksm.KSMConfigKeys; +import org.apache.hadoop.scm.ScmConfigKeys; import org.junit.Assert; import java.io.IOException; @@ -76,6 +78,10 @@ public static void createCluster() throws IOException { storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); } + private void copyClusterConfigs(String configKey) { + getConf().set(configKey, cluster.getConf().get(configKey)); + } + @Override public FileSystem getTestFileSystem() throws IOException { //assumes cluster is not null @@ -95,8 +101,6 @@ public FileSystem getTestFileSystem() throws IOException { BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs); try { storageHandler.createVolume(volumeArgs); - - storageHandler.createBucket(bucketArgs); } catch (OzoneException e) { throw new IOException(e.getMessage()); @@ -107,6 +111,8 @@ public FileSystem getTestFileSystem() throws IOException { String uri = String.format("%s://localhost:%d/%s/%s", Constants.OZONE_URI_SCHEME, port, volumeName, bucketName); getConf().set("fs.defaultFS", uri); + copyClusterConfigs(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY); + copyClusterConfigs(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); return FileSystem.get(getConf()); }