diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml index 40d78d0cd6..c55f8e3e61 100644 --- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml @@ -15,4 +15,12 @@ limitations under the License. --> + + + + + + diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java new file mode 100644 index 0000000000..e5bfc2c4d9 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Used by {@link AliyunOSSInputStream} as an task that submitted + * to the thread pool. + * Each AliyunOSSFileReaderTask reads one part of the file so that + * we can accelerate the sequential read. + */ +public class AliyunOSSFileReaderTask implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileReaderTask.class); + + private String key; + private AliyunOSSFileSystemStore store; + private ReadBuffer readBuffer; + private static final int MAX_RETRIES = 3; + private RetryPolicy retryPolicy; + + public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store, + ReadBuffer readBuffer) { + this.key = key; + this.store = store; + this.readBuffer = readBuffer; + RetryPolicy defaultPolicy = + RetryPolicies.retryUpToMaximumCountWithFixedSleep( + MAX_RETRIES, 3, TimeUnit.SECONDS); + Map, RetryPolicy> policies = new HashMap<>(); + policies.put(IOException.class, defaultPolicy); + policies.put(IndexOutOfBoundsException.class, + RetryPolicies.TRY_ONCE_THEN_FAIL); + policies.put(NullPointerException.class, + RetryPolicies.TRY_ONCE_THEN_FAIL); + + this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies); + } + + @Override + public void run() { + int retries = 0; + readBuffer.lock(); + try { + while (true) { + try (InputStream in = store.retrieve( + key, readBuffer.getByteStart(), readBuffer.getByteEnd())) { + IOUtils.readFully(in, readBuffer.getBuffer(), + 0, readBuffer.getBuffer().length); + readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS); + break; + } catch (Exception e) { + LOG.warn("Exception thrown when retrieve key: " + + this.key + ", exception: " + e); + try { + RetryPolicy.RetryAction rc = retryPolicy.shouldRetry( + e, retries++, 0, true); + if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + Thread.sleep(rc.delayMillis); + } else { + //should not retry + break; + } + } catch (Exception ex) { + //FAIL + LOG.warn("Exception thrown when call shouldRetry, exception " + ex); + break; + } + } + } + + if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) { + readBuffer.setStatus(ReadBuffer.STATUS.ERROR); + } + + //notify main thread which wait for this buffer + readBuffer.signalAll(); + } finally { + readBuffer.unlock(); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 41d475dc0b..afff2237af 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -24,7 +24,9 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -41,12 +43,14 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Progressable; import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.ObjectListing; import com.aliyun.oss.model.ObjectMetadata; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem { private Path workingDir; private AliyunOSSFileSystemStore store; private int maxKeys; + private int maxReadAheadPartNumber; + private ListeningExecutorService boundedThreadPool; + private static final PathFilter DEFAULT_FILTER = new PathFilter() { @Override public boolean accept(Path file) { @@ -82,6 +89,7 @@ public FSDataOutputStream append(Path path, int bufferSize, public void close() throws IOException { try { store.close(); + boundedThreadPool.shutdown(); } finally { super.close(); } @@ -309,10 +317,24 @@ public void initialize(URI name, Configuration conf) throws IOException { store = new AliyunOSSFileSystemStore(); store.initialize(name, conf, statistics); maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + + int threadNum = AliyunOSSUtils.intPositiveOption(conf, + Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY, + Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT); + + int totalTasks = AliyunOSSUtils.intPositiveOption(conf, + Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT); + + maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf, + Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY, + Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT); + + this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( + threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared"); setConf(conf); } - /** +/** * Turn a path (relative or otherwise) into an OSS key. * * @param path the path of the file. @@ -523,8 +545,11 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { " because it is a directory"); } - return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, - pathToKey(path), fileStatus.getLen(), statistics)); + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), + new SemaphoredDelegatingExecutor( + boundedThreadPool, maxReadAheadPartNumber, true), + maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(), + statistics)); } @Override diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java index 72ba61925a..494ac534a0 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -20,8 +20,11 @@ import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream { private final String key; private Statistics statistics; private boolean closed; - private InputStream wrappedStream = null; private long contentLength; private long position; private long partRemaining; + private byte[] buffer; + private int maxReadAheadPartNumber; + private long expectNextPos; + private long lastByteStart; + + private ExecutorService readAheadExecutorService; + private Queue readBufferQueue = new ArrayDeque<>(); public AliyunOSSInputStream(Configuration conf, + ExecutorService readAheadExecutorService, int maxReadAheadPartNumber, AliyunOSSFileSystemStore store, String key, Long contentLength, Statistics statistics) throws IOException { + this.readAheadExecutorService = + MoreExecutors.listeningDecorator(readAheadExecutorService); this.store = store; this.key = key; this.statistics = statistics; this.contentLength = contentLength; downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT); + this.maxReadAheadPartNumber = maxReadAheadPartNumber; + + this.expectNextPos = 0; + this.lastByteStart = -1; reopen(0); closed = false; } @@ -82,15 +98,81 @@ private synchronized void reopen(long pos) throws IOException { partSize = downloadPartSize; } - if (wrappedStream != null) { + if (this.buffer != null) { if (LOG.isDebugEnabled()) { LOG.debug("Aborting old stream to open at pos " + pos); } - wrappedStream.close(); + this.buffer = null; } - wrappedStream = store.retrieve(key, pos, pos + partSize -1); - if (wrappedStream == null) { + boolean isRandomIO = true; + if (pos == this.expectNextPos) { + isRandomIO = false; + } else { + //new seek, remove cache buffers if its byteStart is not equal to pos + while (readBufferQueue.size() != 0) { + if (readBufferQueue.element().getByteStart() != pos) { + readBufferQueue.poll(); + } else { + break; + } + } + } + + this.expectNextPos = pos + partSize; + + int currentSize = readBufferQueue.size(); + if (currentSize == 0) { + //init lastByteStart to pos - partSize, used by for loop below + lastByteStart = pos - partSize; + } else { + ReadBuffer[] readBuffers = readBufferQueue.toArray( + new ReadBuffer[currentSize]); + lastByteStart = readBuffers[currentSize - 1].getByteStart(); + } + + int maxLen = this.maxReadAheadPartNumber - currentSize; + for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) { + if (lastByteStart + partSize * (i + 1) > contentLength) { + break; + } + + long byteStart = lastByteStart + partSize * (i + 1); + long byteEnd = byteStart + partSize -1; + if (byteEnd >= contentLength) { + byteEnd = contentLength - 1; + } + + ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd); + if (readBuffer.getBuffer().length == 0) { + //EOF + readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS); + } else { + this.readAheadExecutorService.execute( + new AliyunOSSFileReaderTask(key, store, readBuffer)); + } + readBufferQueue.add(readBuffer); + if (isRandomIO) { + break; + } + } + + ReadBuffer readBuffer = readBufferQueue.poll(); + readBuffer.lock(); + try { + readBuffer.await(ReadBuffer.STATUS.INIT); + if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) { + this.buffer = null; + } else { + this.buffer = readBuffer.getBuffer(); + } + } catch (InterruptedException e) { + LOG.warn("interrupted when wait a read buffer"); + } finally { + readBuffer.unlock(); + } + + if (this.buffer == null) { throw new IOException("Null IO stream"); } position = pos; @@ -105,18 +187,10 @@ public synchronized int read() throws IOException { reopen(position); } - int tries = MAX_RETRIES; - boolean retry; int byteRead = -1; - do { - retry = false; - try { - byteRead = wrappedStream.read(); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; - } - } while (retry); + if (partRemaining != 0) { + byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF; + } if (byteRead >= 0) { position++; partRemaining--; @@ -161,21 +235,18 @@ public synchronized int read(byte[] buf, int off, int len) reopen(position); } - int tries = MAX_RETRIES; - boolean retry; - int bytes = -1; - do { - retry = false; - try { - bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; + int bytes = 0; + for (int i = this.buffer.length - (int)partRemaining; + i < this.buffer.length; i++) { + buf[off + bytesRead] = this.buffer[i]; + bytes++; + bytesRead++; + if (off + bytesRead >= len) { + break; } - } while (retry); + } if (bytes > 0) { - bytesRead += bytes; position += bytes; partRemaining -= bytes; } else if (partRemaining != 0) { @@ -202,9 +273,7 @@ public synchronized void close() throws IOException { return; } closed = true; - if (wrappedStream != null) { - wrappedStream.close(); - } + this.buffer = null; } @Override @@ -225,7 +294,6 @@ public synchronized void seek(long pos) throws IOException { return; } else if (pos > position && pos < position + partRemaining) { long len = pos - position; - AliyunOSSUtils.skipFully(wrappedStream, len); position = pos; partRemaining -= len; } else { @@ -245,18 +313,7 @@ public boolean seekToNewSource(long targetPos) throws IOException { return false; } - private void handleReadException(Exception e, int tries) throws IOException{ - if (tries == 0) { - throw new IOException(e); - } - - LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + - " connection at position '" + position + "', " + e.getMessage()); - try { - Thread.sleep(100); - } catch (InterruptedException e2) { - LOG.warn(e2.getMessage()); - } - reopen(position); + public long getExpectNextPos() { + return this.expectNextPos; } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index fdf72e48c0..1a2160889a 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -40,6 +40,18 @@ final public class AliyunOSSUtils { private AliyunOSSUtils() { } + public static int intPositiveOption( + Configuration conf, String key, int defVal) { + int v = conf.getInt(key, defVal); + if (v <= 0) { + LOG.warn(key + " is configured to " + v + + ", will use default value: " + defVal); + v = defVal; + } + + return v; + } + /** * Used to get password from configuration. * diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index dd71842fb8..410adc9037 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -97,7 +97,18 @@ private Constants() { public static final String MULTIPART_DOWNLOAD_SIZE_KEY = "fs.oss.multipart.download.size"; - public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; + public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024; + + public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY = + "fs.oss.multipart.download.threads"; + public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10; + + public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks"; + public static final int MAX_TOTAL_TASKS_DEFAULT = 128; + + public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY = + "fs.oss.multipart.download.ahead.part.max.number"; + public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4; // Comma separated list of directories public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java new file mode 100644 index 0000000000..46bb5bf079 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This class is used by {@link AliyunOSSInputStream} + * and {@link AliyunOSSFileReaderTask} to buffer data that read from oss. + */ +public class ReadBuffer { + enum STATUS { + INIT, SUCCESS, ERROR + } + private final ReentrantLock lock = new ReentrantLock(); + + private Condition readyCondition = lock.newCondition(); + + private byte[] buffer; + private STATUS status; + private long byteStart; + private long byteEnd; + + public ReadBuffer(long byteStart, long byteEnd) { + this.buffer = new byte[(int)(byteEnd - byteStart) + 1]; + + this.status = STATUS.INIT; + this.byteStart = byteStart; + this.byteEnd = byteEnd; + } + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + + public void await(STATUS waitStatus) throws InterruptedException { + while (this.status == waitStatus) { + readyCondition.await(); + } + } + + public void signalAll() { + readyCondition.signalAll(); + } + + public byte[] getBuffer() { + return buffer; + } + + public STATUS getStatus() { + return status; + } + + public void setStatus(STATUS status) { + this.status = status; + } + + public long getByteStart() { + return byteStart; + } + + public long getByteEnd() { + return byteEnd; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java index 10c4edda04..66068c6362 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java @@ -35,6 +35,7 @@ import java.util.Random; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -107,6 +108,54 @@ public void testSeekFile() throws Exception { IOUtils.closeStream(instream); } + @Test + public void testSequentialAndRandomRead() throws Exception { + Path smallSeekFile = setPath("/test/smallSeekFile.txt"); + long size = 5 * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255); + LOG.info("5MB file created: smallSeekFile.txt"); + + FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile); + AliyunOSSInputStream in = + (AliyunOSSInputStream)fsDataInputStream.getWrappedStream(); + assertTrue("expected position at:" + 0 + ", but got:" + + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0); + + assertTrue("expected position at:" + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + + in.getExpectNextPos(), + in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); + fsDataInputStream.seek(4 * 1024 * 1024); + assertTrue("expected position at:" + 4 * 1024 * 1024 + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + + in.getExpectNextPos(), + in.getExpectNextPos() == 4 * 1024 * 1024 + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); + IOUtils.closeStream(fsDataInputStream); + } + + @Test + public void testOSSFileReaderTask() throws Exception { + Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt"); + long size = 5 * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255); + LOG.info("5MB file created: smallSeekFileOSSFileReader.txt"); + ReadBuffer readBuffer = new ReadBuffer(12, 24); + AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1", + ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer); + //NullPointerException, fail + task.run(); + assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR); + //OK + task = new AliyunOSSFileReaderTask( + "test/test/smallSeekFileOSSFileReader.txt", + ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer); + task.run(); + assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS); + } + @Test public void testReadFile() throws Exception { final int bufLen = 256;