diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java index d9a3f5830d..132d38526b 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.aliyun.oss; import com.aliyun.oss.model.PartETag; +import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; @@ -27,17 +28,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** * Asynchronous multi-part based uploading mechanism to support huge file @@ -49,71 +48,103 @@ public class AliyunOSSBlockOutputStream extends OutputStream { LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class); private AliyunOSSFileSystemStore store; private Configuration conf; - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); private String key; - private File blockFile; - private Map blockFiles = new HashMap<>(); - private long blockSize; + private int blockSize; private int blockId = 0; private long blockWritten = 0L; private String uploadId = null; private final List> partETagsFutures; + private final OSSDataBlocks.BlockFactory blockFactory; + private final BlockOutputStreamStatistics statistics; + private OSSDataBlocks.DataBlock activeBlock; private final ListeningExecutorService executorService; - private OutputStream blockStream; private final byte[] singleByte = new byte[1]; public AliyunOSSBlockOutputStream(Configuration conf, AliyunOSSFileSystemStore store, String key, - Long blockSize, + int blockSize, + OSSDataBlocks.BlockFactory blockFactory, + BlockOutputStreamStatistics statistics, ExecutorService executorService) throws IOException { this.store = store; this.conf = conf; this.key = key; this.blockSize = blockSize; - this.blockFile = newBlockFile(); - this.blockStream = - new BufferedOutputStream(new FileOutputStream(blockFile)); + this.blockFactory = blockFactory; + this.statistics = statistics; this.partETagsFutures = new ArrayList<>(2); this.executorService = MoreExecutors.listeningDecorator(executorService); } - private File newBlockFile() throws IOException { - return AliyunOSSUtils.createTmpFileForWrite( - String.format("oss-block-%04d-", blockId), blockSize, conf); + /** + * Demand create a destination block. + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized OSSDataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockId++; + activeBlock = blockFactory.create(blockId, blockSize, statistics); + } + return activeBlock; } + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Stream closed."); + } + } + + /** + * The flush operation does not trigger an upload; that awaits + * the next block being full. What it does do is call {@code flush() } + * on the current block, leaving it to choose how to react. + * @throws IOException Any IO problem. + */ @Override public synchronized void flush() throws IOException { - blockStream.flush(); + checkOpen(); + + OSSDataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock != null) { + dataBlock.flush(); + } } @Override public synchronized void close() throws IOException { - if (closed) { + if (closed.get()) { + // already closed + LOG.debug("Ignoring close() as stream is already closed"); return; } - blockStream.flush(); - blockStream.close(); - if (!blockFiles.values().contains(blockFile)) { - blockId++; - blockFiles.put(blockId, blockFile); - } - try { - if (blockFiles.size() == 1) { + if (uploadId == null) { // just upload it directly - store.uploadObject(key, blockFile); + OSSDataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock == null) { + // zero size file + store.storeEmptyFile(key); + } else { + OSSDataBlocks.BlockUploadData uploadData = dataBlock.startUpload(); + if (uploadData.hasFile()) { + store.uploadObject(key, uploadData.getFile()); + } else { + store.uploadObject(key, + uploadData.getUploadStream(), dataBlock.dataSize()); + } + } } else { if (blockWritten > 0) { - ListenableFuture partETagFuture = - executorService.submit(() -> { - PartETag partETag = store.uploadPart(blockFile, key, uploadId, - blockId); - return partETag; - }); - partETagsFutures.add(partETagFuture); + uploadCurrentBlock(); } // wait for the partial uploads to finish final List partETags = waitForAllPartUploads(); @@ -124,8 +155,8 @@ public synchronized void close() throws IOException { new ArrayList<>(partETags)); } } finally { - removeTemporaryFiles(); - closed = true; + cleanupWithLogger(LOG, getActiveBlock(), blockFactory); + closed.set(true); } } @@ -138,64 +169,82 @@ public synchronized void write(int b) throws IOException { @Override public synchronized void write(byte[] b, int off, int len) throws IOException { - if (closed) { - throw new IOException("Stream closed."); + int totalWritten = 0; + while (totalWritten < len) { + int written = internalWrite(b, off + totalWritten, len - totalWritten); + totalWritten += written; + LOG.debug("Buffer len {}, written {}, total written {}", + len, written, totalWritten); } - blockStream.write(b, off, len); - blockWritten += len; - if (blockWritten >= blockSize) { - uploadCurrentPart(); - blockWritten = 0L; + } + private synchronized int internalWrite(byte[] b, int off, int len) + throws IOException { + OSSDataBlocks.validateWriteArgs(b, off, len); + checkOpen(); + if (len == 0) { + return 0; + } + OSSDataBlocks.DataBlock block = createBlockIfNeeded(); + int written = block.write(b, off, len); + blockWritten += written; + int remainingCapacity = block.remainingCapacity(); + if (written < len) { + // not everything was written — the block has run out + // of capacity + // Trigger an upload then process the remainder. + LOG.debug("writing more data than block has capacity -triggering upload"); + uploadCurrentBlock(); + } else { + if (remainingCapacity == 0) { + // the whole buffer is done, trigger an upload + uploadCurrentBlock(); + } + } + return written; + } + + /** + * Clear the active block. + */ + private void clearActiveBlock() { + if (activeBlock != null) { + LOG.debug("Clearing active block"); + } + synchronized (this) { + activeBlock = null; } } - private void removeTemporaryFiles() { - for (File file : blockFiles.values()) { - if (file != null && file.exists() && !file.delete()) { - LOG.warn("Failed to delete temporary file {}", file); - } - } + private synchronized OSSDataBlocks.DataBlock getActiveBlock() { + return activeBlock; } - private void removePartFiles() throws IOException { - for (ListenableFuture partETagFuture : partETagsFutures) { - if (!partETagFuture.isDone()) { - continue; - } - - try { - File blockFile = blockFiles.get(partETagFuture.get().getPartNumber()); - if (blockFile != null && blockFile.exists() && !blockFile.delete()) { - LOG.warn("Failed to delete temporary file {}", blockFile); - } - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } - } - } - - private void uploadCurrentPart() throws IOException { - blockStream.flush(); - blockStream.close(); - if (blockId == 0) { + private void uploadCurrentBlock() + throws IOException { + if (uploadId == null) { uploadId = store.getUploadId(key); } - blockId++; - blockFiles.put(blockId, blockFile); - - File currentFile = blockFile; int currentBlockId = blockId; - ListenableFuture partETagFuture = - executorService.submit(() -> { - PartETag partETag = store.uploadPart(currentFile, key, uploadId, - currentBlockId); - return partETag; - }); - partETagsFutures.add(partETagFuture); - removePartFiles(); - blockFile = newBlockFile(); - blockStream = new BufferedOutputStream(new FileOutputStream(blockFile)); + OSSDataBlocks.DataBlock dataBlock = getActiveBlock(); + long size = dataBlock.dataSize(); + OSSDataBlocks.BlockUploadData uploadData = dataBlock.startUpload(); + try { + ListenableFuture partETagFuture = + executorService.submit(() -> { + try { + PartETag partETag = store.uploadPart(uploadData, size, key, + uploadId, currentBlockId); + return partETag; + } finally { + cleanupWithLogger(LOG, uploadData, dataBlock); + } + }); + partETagsFutures.add(partETagFuture); + } finally { + blockWritten = 0; + clearActiveBlock(); + } } /** diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 5f40488bfd..c41940fde9 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -27,6 +27,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics; +import org.apache.hadoop.fs.aliyun.oss.statistics.impl.OutputStreamStatistics; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -71,6 +74,9 @@ public class AliyunOSSFileSystem extends FileSystem { private String bucket; private String username; private Path workingDir; + private OSSDataBlocks.BlockFactory blockFactory; + private BlockOutputStreamStatistics blockOutputStreamStatistics; + private int uploadPartSize; private int blockOutputActiveBlocks; private AliyunOSSFileSystemStore store; private int maxKeys; @@ -128,13 +134,13 @@ public FSDataOutputStream create(Path path, FsPermission permission, // this means the file is not found } - long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(), - MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT); return new FSDataOutputStream( new AliyunOSSBlockOutputStream(getConf(), store, key, uploadPartSize, + blockFactory, + blockOutputStreamStatistics, new SemaphoredDelegatingExecutor(boundedThreadPool, blockOutputActiveBlocks, true)), statistics); } @@ -334,6 +340,7 @@ public String getCanonicalServiceName() { */ public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); + setConf(conf); bucket = name.getHost(); uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); @@ -345,6 +352,16 @@ public void initialize(URI name, Configuration conf) throws IOException { blockOutputActiveBlocks = intOption(conf, UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1); + uploadPartSize = (int)AliyunOSSUtils.getMultipartSizeProperty(conf, + MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT); + String uploadBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, + DEFAULT_FAST_UPLOAD_BUFFER); + + blockOutputStreamStatistics = new OutputStreamStatistics(); + blockFactory = OSSDataBlocks.createFactory(this, uploadBuffer); + LOG.debug("Using OSSBlockOutputStream with buffer = {}; block={};" + + " queue limit={}", + uploadBuffer, uploadPartSize, blockOutputActiveBlocks); store = new AliyunOSSFileSystemStore(); store.initialize(name, conf, username, statistics); maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); @@ -379,8 +396,6 @@ public void initialize(URI name, Configuration conf) throws IOException { this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance( maxCopyThreads, maxCopyTasks, 60L, TimeUnit.SECONDS, "oss-copy-unbounded"); - - setConf(conf); } /** @@ -757,4 +772,14 @@ public void setWorkingDirectory(Path dir) { public AliyunOSSFileSystemStore getStore() { return store; } + + @VisibleForTesting + OSSDataBlocks.BlockFactory getBlockFactory() { + return blockFactory; + } + + @VisibleForTesting + BlockOutputStreamStatistics getBlockOutputStreamStatistics() { + return blockOutputStreamStatistics; + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 156af04bab..6e0c7dc7e4 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -422,6 +422,27 @@ public void uploadObject(String key, File file) throws IOException { } } + /** + * Upload an input stream as an OSS object, using single upload. + * @param key object key. + * @param in input stream to upload. + * @param size size of the input stream. + * @throws IOException if failed to upload object. + */ + public void uploadObject(String key, InputStream in, long size) + throws IOException { + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(size); + + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + + PutObjectResult result = ossClient.putObject(bucketName, key, in, meta); + LOG.debug(result.getETag()); + statistics.incrementWriteOps(1); + } + /** * list objects. * @@ -652,44 +673,58 @@ private boolean continueListStatus() { }; } + public PartETag uploadPart(OSSDataBlocks.BlockUploadData partData, + long size, String key, String uploadId, int idx) throws IOException { + if (partData.hasFile()) { + return uploadPart(partData.getFile(), key, uploadId, idx); + } else { + return uploadPart(partData.getUploadStream(), size, key, uploadId, idx); + } + } + public PartETag uploadPart(File file, String key, String uploadId, int idx) throws IOException { - InputStream instream = null; + InputStream in = new FileInputStream(file); + try { + return uploadPart(in, file.length(), key, uploadId, idx); + } finally { + in.close(); + } + } + + public PartETag uploadPart(InputStream in, long size, String key, + String uploadId, int idx) throws IOException { Exception caught = null; int tries = 3; while (tries > 0) { try { - instream = new FileInputStream(file); UploadPartRequest uploadRequest = new UploadPartRequest(); uploadRequest.setBucketName(bucketName); uploadRequest.setKey(key); uploadRequest.setUploadId(uploadId); - uploadRequest.setInputStream(instream); - uploadRequest.setPartSize(file.length()); + uploadRequest.setInputStream(in); + uploadRequest.setPartSize(size); uploadRequest.setPartNumber(idx); UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest); statistics.incrementWriteOps(1); return uploadResult.getPartETag(); } catch (Exception e) { - LOG.debug("Failed to upload "+ file.getPath() +", " + + LOG.debug("Failed to upload " + key + ", part " + idx + "try again.", e); caught = e; - } finally { - if (instream != null) { - instream.close(); - instream = null; - } } tries--; } assert (caught != null); - throw new IOException("Failed to upload " + file.getPath() + + throw new IOException("Failed to upload " + key + ", part " + idx + " for 3 times.", caught); } /** * Initiate multipart upload. + * @param key object key. + * @return upload id. */ public String getUploadId(String key) { InitiateMultipartUploadRequest initiateMultipartUploadRequest = @@ -701,6 +736,10 @@ public String getUploadId(String key) { /** * Complete the specific multipart upload. + * @param key object key. + * @param uploadId upload id of this multipart upload. + * @param partETags part etags need to be completed. + * @return CompleteMultipartUploadResult. */ public CompleteMultipartUploadResult completeMultipartUpload(String key, String uploadId, List partETags) { @@ -713,6 +752,8 @@ public CompleteMultipartUploadResult completeMultipartUpload(String key, /** * Abort the specific multipart upload. + * @param key object key. + * @param uploadId upload id of this multipart upload. */ public void abortMultipartUpload(String key, String uploadId) { AbortMultipartUploadRequest request = new AbortMultipartUploadRequest( diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index 3421b42181..baeb919937 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -134,6 +134,59 @@ private Constants() { // Comma separated list of directories public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; + /** + * What buffer to use. + * Default is {@link #FAST_UPLOAD_BUFFER_DISK} + * Value: {@value} + */ + public static final String FAST_UPLOAD_BUFFER = + "fs.oss.fast.upload.buffer"; + + /** + * Buffer blocks to disk: {@value}. + * Capacity is limited to available disk space. + */ + public static final String FAST_UPLOAD_BUFFER_DISK = "disk"; + + /** + * Use an in-memory array. Fast but will run of heap rapidly: {@value}. + */ + public static final String FAST_UPLOAD_BUFFER_ARRAY = "array"; + + /** + * Use a byte buffer. May be more memory efficient than the + * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}. + */ + public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer"; + + /** + * Use an in-memory array and fallback to disk if + * used memory exceed the quota. + */ + public static final String FAST_UPLOAD_BUFFER_ARRAY_DISK = "array_disk"; + + /** + * Use a byte buffer and fallback to disk if + * used memory exceed the quota. + */ + public static final String FAST_UPLOAD_BYTEBUFFER_DISK = "bytebuffer_disk"; + + /** + * Memory limit of {@link #FAST_UPLOAD_BUFFER_ARRAY_DISK} or + * {@link #FAST_UPLOAD_BYTEBUFFER_DISK}. + */ + public static final String FAST_UPLOAD_BUFFER_MEMORY_LIMIT = + "fs.oss.fast.upload.memory.limit"; + + public static final long FAST_UPLOAD_BUFFER_MEMORY_LIMIT_DEFAULT = + 1024 * 1024 * 1024; // 1GB + + /** + * Default buffer option: {@value}. + */ + public static final String DEFAULT_FAST_UPLOAD_BUFFER = + FAST_UPLOAD_BUFFER_DISK; + // private | public-read | public-read-write public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; public static final String CANNED_ACL_DEFAULT = ""; diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/OSSDataBlocks.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/OSSDataBlocks.java new file mode 100644 index 0000000000..048f8b7ec3 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/OSSDataBlocks.java @@ -0,0 +1,1109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics; +import org.apache.hadoop.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.util.DirectBufferPool; + +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Set of classes to support output streaming into blocks which are then + * uploaded as to OSS as a single PUT, or as part of a multipart request. + */ +final class OSSDataBlocks { + private static final Logger LOG = + LoggerFactory.getLogger(OSSDataBlocks.class); + + private OSSDataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * @param b byte array containing data + * @param off offset in array where to start + * @param len number of bytes to be written + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * @param owner factory owner + * @param name factory name -the option from {@link Constants}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + static BlockFactory createFactory(AliyunOSSFileSystem owner, + String name) { + switch (name) { + case Constants.FAST_UPLOAD_BUFFER_ARRAY: + return new ArrayBlockFactory(owner); + case Constants.FAST_UPLOAD_BUFFER_DISK: + return new DiskBlockFactory(owner); + case Constants.FAST_UPLOAD_BYTEBUFFER: + return new ByteBufferBlockFactory(owner); + case Constants.FAST_UPLOAD_BUFFER_ARRAY_DISK: + return new MemoryAndDiskBlockFactory( + owner, new ArrayBlockFactory(owner)); + case Constants.FAST_UPLOAD_BYTEBUFFER_DISK: + return new MemoryAndDiskBlockFactory( + owner, new ByteBufferBlockFactory(owner)); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * The output information for an upload. + * It can be one of a file or an input stream. + * When closed, any stream is closed. Any source file is untouched. + */ + static final class BlockUploadData implements Closeable { + private final File file; + private final InputStream uploadStream; + + /** + * File constructor; input stream will be null. + * @param file file to upload + */ + BlockUploadData(File file) { + Preconditions.checkArgument(file.exists(), "No file: " + file); + this.file = file; + this.uploadStream = null; + } + + /** + * Stream constructor, file field will be null. + * @param uploadStream stream to upload + */ + BlockUploadData(InputStream uploadStream) { + Preconditions.checkNotNull(uploadStream, "rawUploadStream"); + this.uploadStream = uploadStream; + this.file = null; + } + + /** + * Predicate: does this instance contain a file reference. + * @return true if there is a file. + */ + boolean hasFile() { + return file != null; + } + + /** + * Get the file, if there is one. + * @return the file for uploading, or null. + */ + File getFile() { + return file; + } + + /** + * Get the raw upload stream, if the object was + * created with one. + * @return the upload stream or null. + */ + InputStream getUploadStream() { + return uploadStream; + } + + /** + * Close: closes any upload stream provided in the constructor. + * @throws IOException inherited exception + */ + @Override + public void close() throws IOException { + cleanupWithLogger(LOG, uploadStream); + } + } + + /** + * Base class for block factories. + */ + static abstract class BlockFactory implements Closeable { + private final AliyunOSSFileSystem owner; + + protected BlockFactory(AliyunOSSFileSystem owner) { + this.owner = owner; + } + + /** + * Create a block. + * + * @param index index of block + * @param limit limit of the block + * @param statistics stats to work with + * @return a new block. + */ + abstract DataBlock create(long index, int limit, + BlockOutputStreamStatistics statistics) throws IOException; + + /** + * Implement any close/cleanup operation. + * Base class is a no-op + * @throws IOException Inherited exception; implementations should + * avoid raising it. + */ + @Override + public void close() throws IOException { + } + + /** + * Owner. + */ + protected AliyunOSSFileSystem getOwner() { + return owner; + } + } + + /** + * This represents a block being uploaded. + */ + static abstract class DataBlock implements Closeable { + + enum DestState {Writing, Upload, Closed} + + private volatile DestState state = DestState.Writing; + private final long index; + private final BlockOutputStreamStatistics statistics; + + protected DataBlock(long index, + BlockOutputStreamStatistics statistics) { + this.index = index; + this.statistics = statistics; + } + + /** + * Atomically enter a state, verifying current state. + * @param current current state. null means "no check" + * @param next next state + * @throws IllegalStateException if the current state is not as expected + */ + protected synchronized final void enterState(DestState current, + DestState next) + throws IllegalStateException { + verifyState(current); + LOG.debug("{}: entering state {}", this, next); + state = next; + } + + /** + * Verify that the block is in the declared state. + * @param expected expected state. + * @throws IllegalStateException if the DataBlock is in the wrong state + */ + protected final void verifyState(DestState expected) + throws IllegalStateException { + if (expected != null && state != expected) { + throw new IllegalStateException("Expected stream state " + expected + + " -but actual state is " + state + " in " + this); + } + } + + /** + * Current state. + * @return the current state. + */ + final DestState getState() { + return state; + } + + /** + * Get index, used by subclasses. + */ + final long getIndex() { + return index; + } + + /** + * Return the current data size. + * @return the size of the data + */ + abstract int dataSize(); + + /** + * Predicate to verify that the block has the capacity to write + * the given set of bytes. + * @param bytes number of bytes desired to be written. + * @return true if there is enough space. + */ + abstract boolean hasCapacity(long bytes); + + /** + * Predicate to check if there is data in the block. + * @return true if there is + */ + boolean hasData() { + return dataSize() > 0; + } + + /** + * The remaining capacity in the block before it is full. + * @return the number of bytes remaining. + */ + abstract int remainingCapacity(); + + /** + * Write a series of bytes from the buffer, from the offset. + * Returns the number of bytes written. + * Only valid in the state {@code Writing}. + * Base class verifies the state but does no writing. + * @param buffer buffer + * @param offset offset + * @param length length of write + * @return number of bytes written + * @throws IOException trouble + */ + int write(byte[] buffer, int offset, int length) throws IOException { + verifyState(DestState.Writing); + Preconditions.checkArgument(buffer != null, "Null buffer"); + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(offset >= 0, "offset is negative"); + Preconditions.checkArgument( + !(buffer.length - offset < length), + "buffer shorter than amount of data to write"); + return 0; + } + + /** + * Flush the output. + * Only valid in the state {@code Writing}. + * In the base class, this is a no-op + * @throws IOException any IO problem. + */ + void flush() throws IOException { + verifyState(DestState.Writing); + } + + /** + * Switch to the upload state and return a stream for uploading. + * Base class calls {@link #enterState(DestState, DestState)} to + * manage the state machine. + * @return the stream + * @throws IOException trouble + */ + BlockUploadData startUpload() throws IOException { + LOG.debug("Start datablock[{}] upload", index); + enterState(DestState.Writing, DestState.Upload); + return null; + } + + /** + * Enter the closed state. + * @return true if the class was in any other state, implying that + * the subclass should do its close operations + */ + protected synchronized boolean enterClosedState() { + if (!state.equals(DestState.Closed)) { + enterState(null, DestState.Closed); + return true; + } else { + return false; + } + } + + @Override + public void close() throws IOException { + if (enterClosedState()) { + LOG.debug("Closed {}", this); + innerClose(); + } + } + + /** + * Inner close logic for subclasses to implement. + */ + protected void innerClose() throws IOException { + } + + /** + * A block has been allocated. + */ + protected void blockAllocated() { + if (statistics != null) { + statistics.blockAllocated(); + } + } + + /** + * A block has been released. + */ + protected void blockReleased() { + if (statistics != null) { + statistics.blockReleased(); + } + } + + /** + * A disk block has been allocated. + */ + protected void diskBlockAllocated() { + if (statistics != null) { + statistics.diskBlockAllocated(); + } + } + + /** + * A disk block has been released. + */ + protected void diskBlockReleased() { + if (statistics != null) { + statistics.diskBlockReleased(); + } + } + + /** + * Memory bytes has been allocated. + */ + protected void bytesAllocated(long size) { + if (statistics != null) { + statistics.bytesAllocated(size); + } + } + + /** + * Memory bytes has been released. + */ + protected void bytesReleased(long size) { + if (statistics != null) { + statistics.bytesReleased(size); + } + } + + protected BlockOutputStreamStatistics getStatistics() { + return statistics; + } + } + + // ==================================================================== + + static class MemoryLimitException extends IOException { + MemoryLimitException(String msg) { + super(msg); + } + } + + static abstract class MemoryBlockFactory extends BlockFactory { + private final AtomicLong memoryUsed = new AtomicLong(0); + private long memoryLimit = 0; + private boolean checkMemory = false; + + MemoryBlockFactory(AliyunOSSFileSystem owner) { + super(owner); + } + + void setMemoryLimit(long memoryLimit) { + this.memoryLimit = memoryLimit; + if (memoryLimit > 0) { + checkMemory = true; + } + } + + void allocateMemory(long size) throws MemoryLimitException { + if (!checkMemory) { + return; + } + long next = memoryUsed.addAndGet(size); + if (next > memoryLimit) { + memoryUsed.getAndAdd(-size); + String msg = "Can not allocate memory" + + ", memory used " + memoryUsed + + ", allocate size " + size + + ", memory limit " + memoryLimit; + throw new MemoryLimitException(msg); + } + } + + void releaseMemory(long size) { + if (!checkMemory) { + return; + } + memoryUsed.getAndAdd(-size); + } + + long getMemoryUsed() { + return memoryUsed.get(); + } + } + + /** + * Use byte arrays on the heap for storage. + */ + static class ArrayBlockFactory extends MemoryBlockFactory { + + ArrayBlockFactory(AliyunOSSFileSystem owner) { + super(owner); + } + + @Override + DataBlock create(long index, int limit, + BlockOutputStreamStatistics statistics) + throws IOException { + try { + return new ByteArrayBlock(index, limit, statistics); + } catch (MemoryLimitException e) { + LOG.debug(e.getMessage() + ", index " + index); + return null; + } + } + + static class OSSByteArrayOutputStream extends ByteArrayOutputStream { + + OSSByteArrayOutputStream(int size) { + super(size); + } + + /** + * InputStream backed by the internal byte array. + * + * @return + */ + ByteArrayInputStream getInputStream() { + ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count); + this.reset(); + this.buf = null; + return bin; + } + } + + /** + * Stream to memory via a {@code ByteArrayOutputStream}. + *

+ * This has the problem: it can consume a lot of heap space + * proportional to the mismatch between writes to the stream and + * the JVM-wide upload bandwidth to the OSS endpoint. + * The memory consumption can be limited by tuning the filesystem settings + * to restrict the number of queued/active uploads. + */ + + class ByteArrayBlock extends DataBlock { + private OSSByteArrayOutputStream buffer; + private final int limit; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + ByteArrayBlock(long index, + int limit, + BlockOutputStreamStatistics statistics) throws MemoryLimitException { + super(index, statistics); + this.limit = limit; + allocateMemory(limit); + buffer = new OSSByteArrayOutputStream(limit); + blockAllocated(); + bytesAllocated(limit); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * + * @return the amount of data available to upload. + */ + @Override + int dataSize() { + return dataSize != null ? dataSize : buffer.size(); + } + + @Override + BlockUploadData startUpload() throws IOException { + super.startUpload(); + dataSize = buffer.size(); + ByteArrayInputStream bufferData = buffer.getInputStream(); + buffer = null; + return new BlockUploadData(bufferData); + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override + int remainingCapacity() { + return limit - dataSize(); + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + buffer.write(b, offset, written); + return written; + } + + @Override + protected void innerClose() { + buffer = null; + releaseMemory(limit); + blockReleased(); + bytesReleased(limit); + } + + @Override + public String toString() { + return "ByteArrayBlock{" + + "index=" + getIndex() + + ", state=" + getState() + + ", limit=" + limit + + ", dataSize=" + dataSize + + '}'; + } + } + } + + // ==================================================================== + + /** + * Stream via Direct ByteBuffers; these are allocated off heap + * via {@link DirectBufferPool}. + */ + static class ByteBufferBlockFactory extends MemoryBlockFactory { + private final DirectBufferPool bufferPool = new DirectBufferPool(); + private final AtomicInteger buffersOutstanding = new AtomicInteger(0); + + ByteBufferBlockFactory(AliyunOSSFileSystem owner) { + super(owner); + } + + @Override + ByteBufferBlock create(long index, int limit, + BlockOutputStreamStatistics statistics) + throws IOException { + try { + return new ByteBufferBlock(index, limit, statistics); + } catch (MemoryLimitException e) { + LOG.debug(e.getMessage() + ", index " + index); + return null; + } + } + + private ByteBuffer requestBuffer(int limit) { + LOG.debug("Requesting buffer of size {}", limit); + buffersOutstanding.incrementAndGet(); + return bufferPool.getBuffer(limit); + } + + private void releaseBuffer(ByteBuffer buffer) { + LOG.debug("Releasing buffer"); + bufferPool.returnBuffer(buffer); + buffersOutstanding.decrementAndGet(); + } + + /** + * Get count of outstanding buffers. + * @return the current buffer count + */ + public int getOutstandingBufferCount() { + return buffersOutstanding.get(); + } + + @Override + public String toString() { + return "ByteBufferBlockFactory{" + + "buffersOutstanding=" + buffersOutstanding + + '}'; + } + + /** + * A DataBlock which requests a buffer from pool on creation; returns + * it when it is closed. + */ + class ByteBufferBlock extends DataBlock { + private ByteBuffer blockBuffer; + private final int bufferSize; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + /** + * Instantiate. This will request a ByteBuffer of the desired size. + * @param index block index + * @param bufferSize buffer size + */ + ByteBufferBlock(long index, int bufferSize, + BlockOutputStreamStatistics statistics) throws MemoryLimitException { + super(index, statistics); + this.bufferSize = bufferSize; + allocateMemory(bufferSize); + blockBuffer = requestBuffer(bufferSize); + blockAllocated(); + bytesAllocated(bufferSize); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * @return the amount of data available to upload. + */ + @Override + int dataSize() { + return dataSize != null ? dataSize : bufferCapacityUsed(); + } + + @Override + BlockUploadData startUpload() throws IOException { + super.startUpload(); + dataSize = bufferCapacityUsed(); + // set the buffer up from reading from the beginning + blockBuffer.limit(blockBuffer.position()); + blockBuffer.position(0); + return new BlockUploadData( + new ByteBufferInputStream(dataSize, blockBuffer)); + } + + @Override + public boolean hasCapacity(long bytes) { + return bytes <= remainingCapacity(); + } + + @Override + public int remainingCapacity() { + return blockBuffer != null ? blockBuffer.remaining() : 0; + } + + private int bufferCapacityUsed() { + return blockBuffer.capacity() - blockBuffer.remaining(); + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + blockBuffer.put(b, offset, written); + return written; + } + + /** + * Closing the block will release the buffer. + */ + @Override + protected void innerClose() { + if (blockBuffer != null) { + releaseMemory(bufferSize); + blockReleased(); + bytesReleased(bufferSize); + releaseBuffer(blockBuffer); + blockBuffer = null; + } + } + + @Override + public String toString() { + return "ByteBufferBlock{" + + "index=" + getIndex() + + ", state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + bufferSize + + ", remainingCapacity=" + remainingCapacity() + + '}'; + } + + /** + * Provide an input stream from a byte buffer; supporting + * {@link #mark(int)}, which is required to enable replay of failed + * PUT attempts. + */ + class ByteBufferInputStream extends InputStream { + + private final int size; + private ByteBuffer byteBuffer; + + ByteBufferInputStream(int size, + ByteBuffer byteBuffer) { + LOG.debug("Creating ByteBufferInputStream of size {}", size); + this.size = size; + this.byteBuffer = byteBuffer; + } + + /** + * After the stream is closed, set the local reference to the byte + * buffer to null; this guarantees that future attempts to use + * stream methods will fail. + */ + @Override + public synchronized void close() { + LOG.debug("ByteBufferInputStream.close() for {}", + ByteBufferBlock.super.toString()); + byteBuffer = null; + } + + /** + * Verify that the stream is open. + * @throws IOException if the stream is closed + */ + private void verifyOpen() throws IOException { + if (byteBuffer == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + public synchronized int read() throws IOException { + if (available() > 0) { + return byteBuffer.get() & 0xFF; + } else { + return -1; + } + } + + @Override + public synchronized long skip(long offset) throws IOException { + verifyOpen(); + long newPos = position() + offset; + if (newPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (newPos > size) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + byteBuffer.position((int) newPos); + return newPos; + } + + @Override + public synchronized int available() { + Preconditions.checkState(byteBuffer != null, + FSExceptionMessages.STREAM_IS_CLOSED); + return byteBuffer.remaining(); + } + + /** + * Get the current buffer position. + * @return the buffer position + */ + public synchronized int position() { + return byteBuffer.position(); + } + + /** + * Check if there is data left. + * @return true if there is data remaining in the buffer. + */ + public synchronized boolean hasRemaining() { + return byteBuffer.hasRemaining(); + } + + @Override + public synchronized void mark(int readlimit) { + LOG.debug("mark at {}", position()); + byteBuffer.mark(); + } + + @Override + public synchronized void reset() throws IOException { + LOG.debug("reset"); + byteBuffer.reset(); + } + + @Override + public boolean markSupported() { + return true; + } + + /** + * Read in data. + * @param b destination buffer + * @param offset offset within the buffer + * @param length length of bytes to read + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the + * amount of data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + @SuppressWarnings("NullableProblems") + public synchronized int read(byte[] b, int offset, int length) + throws IOException { + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(b != null, "Null buffer"); + if (b.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length =" + length + + ", with offset =" + offset + + "; buffer capacity =" + (b.length - offset)); + } + verifyOpen(); + if (!hasRemaining()) { + return -1; + } + + int toRead = Math.min(length, available()); + byteBuffer.get(b, offset, toRead); + return toRead; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ByteBufferInputStream{"); + sb.append("size=").append(size); + ByteBuffer buf = this.byteBuffer; + if (buf != null) { + sb.append(", available=").append(buf.remaining()); + } + sb.append(", ").append(ByteBufferBlock.super.toString()); + sb.append('}'); + return sb.toString(); + } + } + } + } + + // ==================================================================== + + /** + * Buffer blocks to disk. + */ + static class DiskBlockFactory extends BlockFactory { + + DiskBlockFactory(AliyunOSSFileSystem owner) { + super(owner); + } + + /** + * Create a temp file and a {@link DiskBlock} instance to manage it. + * + * @param index block index + * @param limit limit of the block. + * @return the new block + * @throws IOException IO problems + */ + @Override + DataBlock create(long index, int limit, + BlockOutputStreamStatistics statistics) + throws IOException { + File destFile = AliyunOSSUtils.createTmpFileForWrite( + String.format("oss-block-%04d-", index), limit, getOwner().getConf()); + return new DiskBlock(destFile, limit, index, statistics); + } + } + + /** + * Stream to a file. + * This will stop at the limit; the caller is expected to create a new block. + */ + static class DiskBlock extends DataBlock { + + private int bytesWritten = 0; + private final File bufferFile; + private final int limit; + private BufferedOutputStream out; + private final AtomicBoolean closed = new AtomicBoolean(false); + + DiskBlock(File bufferFile, + int limit, + long index, + BlockOutputStreamStatistics statistics) + throws FileNotFoundException { + super(index, statistics); + this.limit = limit; + this.bufferFile = bufferFile; + blockAllocated(); + diskBlockAllocated(); + out = new BufferedOutputStream(new FileOutputStream(bufferFile)); + } + + @Override + int dataSize() { + return bytesWritten; + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override + int remainingCapacity() { + return limit - bytesWritten; + } + + @Override + int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + out.write(b, offset, written); + bytesWritten += written; + return written; + } + + @Override + BlockUploadData startUpload() throws IOException { + super.startUpload(); + try { + out.flush(); + } finally { + out.close(); + out = null; + } + return new BlockUploadData(bufferFile); + } + + /** + * The close operation will delete the destination file if it still + * exists. + * @throws IOException IO problems + */ + @SuppressWarnings("UnnecessaryDefault") + @Override + protected void innerClose() throws IOException { + final DestState state = getState(); + LOG.debug("Closing {}", this); + switch (state) { + case Writing: + if (bufferFile.exists()) { + // file was not uploaded + LOG.debug("Block[{}]: Deleting buffer file as upload did not start", + getIndex()); + closeBlock(); + } + break; + + case Upload: + LOG.debug("Block[{}]: Buffer file {} exists —close upload stream", + getIndex(), bufferFile); + break; + + case Closed: + closeBlock(); + break; + + default: + // this state can never be reached, but checkstyle complains, so + // it is here. + } + } + + /** + * Flush operation will flush to disk. + * @throws IOException IOE raised on FileOutputStream + */ + @Override + void flush() throws IOException { + super.flush(); + out.flush(); + } + + @Override + public String toString() { + String sb = "FileBlock{" + + "index=" + getIndex() + + ", destFile=" + bufferFile + + ", state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + limit + + '}'; + return sb; + } + + /** + * Close the block. + * This will delete the block's buffer file if the block has + * not previously been closed. + */ + void closeBlock() { + LOG.debug("block[{}]: closeBlock()", getIndex()); + if (!closed.getAndSet(true)) { + blockReleased(); + diskBlockReleased(); + if (!bufferFile.delete() && bufferFile.exists()) { + LOG.warn("delete({}) returned false", + bufferFile.getAbsoluteFile()); + } + } else { + LOG.debug("block[{}]: skipping re-entrant closeBlock()", getIndex()); + } + } + } + + /** + * Buffer blocks to memory and fallback to disk if + * used memory exceed the quota. + */ + static class MemoryAndDiskBlockFactory extends BlockFactory { + private BlockFactory memoryFactory; + private BlockFactory diskFactory; + + MemoryAndDiskBlockFactory(AliyunOSSFileSystem owner, + BlockFactory memoryFactory) { + super(owner); + this.memoryFactory = memoryFactory; + diskFactory = new DiskBlockFactory(owner); + + long memoryLimit = owner.getConf().getLong( + Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT, + Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT_DEFAULT); + ((MemoryBlockFactory)this.memoryFactory).setMemoryLimit(memoryLimit); + } + + /** + * Create a temp file and a {@link DataBlock} instance to manage it. + * + * @param index block index + * @param limit limit of the block. + * @return the new block + * @throws IOException IO problems + */ + @Override + DataBlock create(long index, int limit, + BlockOutputStreamStatistics statistics) + throws IOException { + DataBlock block = memoryFactory.create(index, limit, statistics); + if (block != null) { + return block; + } else { + return diskFactory.create(index, limit, statistics); + } + } + + @VisibleForTesting + MemoryBlockFactory getMemoryFactory() { + return (MemoryBlockFactory)memoryFactory; + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/BlockOutputStreamStatistics.java new file mode 100644 index 0000000000..51940b7183 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/BlockOutputStreamStatistics.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.statistics; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Block output stream statistics. + */ +@InterfaceStability.Unstable +public interface BlockOutputStreamStatistics { + + /** + * A block has been allocated. + */ + void blockAllocated(); + + /** + * A block has been released. + */ + void blockReleased(); + + /** + * A disk block has been allocated. + */ + void diskBlockAllocated(); + + /** + * A disk block has been released. + */ + void diskBlockReleased(); + + /** + * Memory bytes has been allocated. + * @param size allocated size. + */ + void bytesAllocated(long size); + + /** + * Memory bytes has been released. + * @param size released size. + */ + void bytesReleased(long size); + + int getBlocksAllocated(); + + int getBlocksReleased(); + + int getDiskBlocksAllocated(); + + int getDiskBlocksReleased(); + + long getBytesAllocated(); + + long getBytesReleased(); +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/OutputStreamStatistics.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/OutputStreamStatistics.java new file mode 100644 index 0000000000..011a2eecd0 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/OutputStreamStatistics.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.statistics.impl; + +import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of {@link BlockOutputStreamStatistics}. + */ +public class OutputStreamStatistics implements BlockOutputStreamStatistics { + private final AtomicInteger blocksAllocated = new AtomicInteger(0); + private final AtomicInteger blocksReleased = new AtomicInteger(0); + + private final AtomicInteger diskBlocksAllocated = new AtomicInteger(0); + private final AtomicInteger diskBlocksReleased = new AtomicInteger(0); + + private final AtomicLong bytesAllocated = new AtomicLong(0); + private final AtomicLong bytesReleased = new AtomicLong(0); + + @Override + public void blockAllocated() { + blocksAllocated.incrementAndGet(); + } + + @Override + public void blockReleased() { + blocksReleased.incrementAndGet(); + } + + @Override + public void diskBlockAllocated() { + diskBlocksAllocated.incrementAndGet(); + } + + @Override + public void diskBlockReleased() { + diskBlocksReleased.incrementAndGet(); + } + + @Override + public int getBlocksAllocated() { + return blocksAllocated.get(); + } + + @Override + public int getBlocksReleased() { + return blocksReleased.get(); + } + + @Override + public int getDiskBlocksAllocated() { + return diskBlocksAllocated.get(); + } + + @Override + public int getDiskBlocksReleased() { + return diskBlocksReleased.get(); + } + + @Override + public void bytesAllocated(long size) { + bytesAllocated.getAndAdd(size); + } + + @Override + public void bytesReleased(long size) { + bytesReleased.getAndAdd(size); + } + + @Override + public long getBytesAllocated() { + return bytesAllocated.get(); + } + + @Override + public long getBytesReleased() { + return bytesReleased.get(); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/package-info.java new file mode 100644 index 0000000000..2f044173bc --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Statistics collection for the OSS connector: implementation. + * Not for use by anything outside the hadoop-aliyun source tree. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.aliyun.oss.statistics.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/package-info.java new file mode 100644 index 0000000000..49abca73d5 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Statistics collection for the OSS connector: interfaces. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.aliyun.oss.statistics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md index d48bf1c6b0..578b9595d1 100644 --- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md +++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md @@ -164,7 +164,7 @@ please raise your issues with them. fs.oss.attempts.maximum - 20 + 10 How many times we should retry commands on transient errors. @@ -239,7 +239,7 @@ please raise your issues with them. fs.oss.multipart.download.size - 102400/value> + 524288/value> Size in bytes in each request from ALiyun OSS. @@ -251,9 +251,53 @@ please raise your issues with them. + + fs.oss.fast.upload.buffer + disk + + The buffering mechanism to use. + Values: disk, array, bytebuffer, array_disk, bytebuffer_disk. + + "disk" will use the directories listed in fs.oss.buffer.dir as + the location(s) to save data prior to being uploaded. + + "array" uses arrays in the JVM heap + + "bytebuffer" uses off-heap memory within the JVM. + + Both "array" and "bytebuffer" will consume memory in a single stream up to the number + of blocks set by: + + fs.oss.multipart.upload.size * fs.oss.upload.active.blocks. + + If using either of these mechanisms, keep this value low + + The total number of threads performing work across all threads is set by + fs.oss.multipart.download.threads(Currently fast upload shares the same thread tool with download. + The thread pool size is specified in "fs.oss.multipart.download.threads"), + with fs.oss.max.total.tasks values setting the number of queued work items. + + "array_disk" and "bytebuffer_disk" support fallback to disk. + + + + + fs.oss.fast.upload.memory.limit + 1073741824 + + Memory limit of "array_disk" and "bytebuffer_disk" upload buffers. + Will fallback to disk buffers if used memory reaches the limit. + + + fs.oss.buffer.dir - Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS + ${env.LOCAL_DIRS:-${hadoop.tmp.dir}}/oss + Comma separated list of directories to buffer + OSS data before uploading to Aliyun OSS. + Yarn container path will be used as default value on yarn applications, + otherwise fall back to hadoop.tmp.dir + diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 69aa0a5a79..891890dfc4 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.OSSDataBlocks.ByteBufferBlockFactory; +import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -37,12 +39,19 @@ import java.util.LinkedHashSet; import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_ARRAY_DISK; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_DISK; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BYTEBUFFER; +import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BYTEBUFFER_DISK; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -54,6 +63,7 @@ public class TestAliyunOSSBlockOutputStream { private static final int PART_SIZE = 1024 * 1024; private static String testRootPath = AliyunOSSTestUtils.generateUniqueTestPath(); + private static final long MEMORY_LIMIT = 10 * 1024 * 1024; @Rule public Timeout testTimeout = new Timeout(30 * 60 * 1000); @@ -65,6 +75,7 @@ public void setUp() throws Exception { conf.setInt(IO_CHUNK_BUFFER_SIZE, conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); + conf.setLong(FAST_UPLOAD_BUFFER_MEMORY_LIMIT, MEMORY_LIMIT); fs = AliyunOSSTestUtils.createTestFileSystem(conf); } @@ -82,7 +93,7 @@ private Path getTestPath() { @Test public void testZeroByteUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0); - bufferDirShouldEmpty(); + bufferShouldReleased(true); } @Test @@ -106,20 +117,21 @@ public void testRegularUpload() throws IOException { assertEquals(size - 1, statistics.getBytesRead()); assertEquals(3, statistics.getWriteOps()); assertEquals(size - 1, statistics.getBytesWritten()); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); assertEquals(14, statistics.getReadOps()); assertEquals(2 * size - 1, statistics.getBytesRead()); assertEquals(6, statistics.getWriteOps()); assertEquals(2 * size - 1, statistics.getBytesWritten()); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1); - assertEquals(22, statistics.getReadOps()); assertEquals(3 * size, statistics.getBytesRead()); assertEquals(10, statistics.getWriteOps()); assertEquals(3 * size, statistics.getBytesWritten()); - bufferDirShouldEmpty(); + bufferShouldReleased(); } @Test @@ -133,19 +145,21 @@ public void testMultiPartUpload() throws IOException { assertEquals(size - 1, statistics.getBytesRead()); assertEquals(8, statistics.getWriteOps()); assertEquals(size - 1, statistics.getBytesWritten()); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); assertEquals(34, statistics.getReadOps()); assertEquals(2 * size - 1, statistics.getBytesRead()); assertEquals(16, statistics.getWriteOps()); assertEquals(2 * size - 1, statistics.getBytesWritten()); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1); assertEquals(52, statistics.getReadOps()); assertEquals(3 * size, statistics.getBytesRead()); assertEquals(25, statistics.getWriteOps()); assertEquals(3 * size, statistics.getBytesWritten()); - bufferDirShouldEmpty(); + bufferShouldReleased(); } @Test @@ -159,16 +173,18 @@ public void testMultiPartUploadConcurrent() throws IOException { assertEquals(size, statistics.getBytesRead()); assertEquals(52, statistics.getWriteOps()); assertEquals(size, statistics.getBytesWritten()); - bufferDirShouldEmpty(); + bufferShouldReleased(); } @Test public void testHugeUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE - 1); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE); + bufferShouldReleased(); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1); - bufferDirShouldEmpty(); + bufferShouldReleased(); } @Test @@ -199,15 +215,43 @@ public void testMultiPartUploadLimit() throws IOException { public void testSmallUpload() throws IOException { long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1); - bufferDirShouldEmpty(); + bufferShouldReleased(); } - private void bufferDirShouldEmpty() throws IOException { - Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY)); - FileStatus[] files = bufferPath.getFileSystem( - fs.getConf()).listStatus(bufferPath); - // Temporary file should be deleted - assertEquals(0, files.length); + private void bufferShouldReleased() throws IOException { + bufferShouldReleased(false); + } + + private void bufferShouldReleased(boolean zeroSizeFile) throws IOException { + String bufferDir = fs.getConf().get(BUFFER_DIR_KEY); + String bufferType = fs.getConf().get(FAST_UPLOAD_BUFFER); + if (bufferType.equals(FAST_UPLOAD_BUFFER_DISK)) { + assertNotNull(bufferDir); + Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY)); + FileStatus[] files = bufferPath.getFileSystem( + fs.getConf()).listStatus(bufferPath); + // Temporary file should be deleted + assertEquals(0, files.length); + } else { + if (bufferType.equals(FAST_UPLOAD_BYTEBUFFER)) { + OSSDataBlocks.ByteBufferBlockFactory + blockFactory = (OSSDataBlocks.ByteBufferBlockFactory) + ((AliyunOSSFileSystem)fs).getBlockFactory(); + assertEquals("outstanding buffers in " + blockFactory, + 0, blockFactory.getOutstandingBufferCount()); + } + } + BlockOutputStreamStatistics statistics = + ((AliyunOSSFileSystem)fs).getBlockOutputStreamStatistics(); + assertEquals(statistics.getBlocksAllocated(), + statistics.getBlocksReleased()); + if (zeroSizeFile) { + assertEquals(statistics.getBlocksAllocated(), 0); + } else { + assertTrue(statistics.getBlocksAllocated() >= 1); + } + assertEquals(statistics.getBytesReleased(), + statistics.getBytesAllocated()); } @Test @@ -249,4 +293,127 @@ public void testDirectoryAllocatorRR() throws Throwable { assertNotEquals("round robin not working", tmp1.getParent(), tmp2.getParent()); } + + @Test + public void testByteBufferIO() throws IOException { + try (OSSDataBlocks.ByteBufferBlockFactory factory = + new OSSDataBlocks.ByteBufferBlockFactory((AliyunOSSFileSystem)fs)) { + int limit = 128; + OSSDataBlocks.ByteBufferBlockFactory.ByteBufferBlock block + = factory.create(1, limit, null); + assertEquals("outstanding buffers in " + factory, + 1, factory.getOutstandingBufferCount()); + + byte[] buffer = ContractTestUtils.toAsciiByteArray("test data"); + int bufferLen = buffer.length; + block.write(buffer, 0, bufferLen); + assertEquals(bufferLen, block.dataSize()); + assertEquals("capacity in " + block, + limit - bufferLen, block.remainingCapacity()); + assertTrue("hasCapacity(64) in " + block, block.hasCapacity(64)); + assertTrue("No capacity in " + block, + block.hasCapacity(limit - bufferLen)); + + // now start the write + OSSDataBlocks.BlockUploadData blockUploadData = block.startUpload(); + ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream + stream = + (ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream) + blockUploadData.getUploadStream(); + assertTrue("Mark not supported in " + stream, stream.markSupported()); + assertTrue("!hasRemaining() in " + stream, stream.hasRemaining()); + + int expected = bufferLen; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + assertEquals('t', stream.read()); + stream.mark(limit); + expected--; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + // read into a byte array with an offset + int offset = 5; + byte[] in = new byte[limit]; + assertEquals(2, stream.read(in, offset, 2)); + assertEquals('e', in[offset]); + assertEquals('s', in[offset + 1]); + expected -= 2; + assertEquals("wrong available() in " + stream, + expected, stream.available()); + + // read to end + byte[] remainder = new byte[limit]; + int c; + int index = 0; + while ((c = stream.read()) >= 0) { + remainder[index++] = (byte) c; + } + assertEquals(expected, index); + assertEquals('a', remainder[--index]); + + assertEquals("wrong available() in " + stream, + 0, stream.available()); + assertTrue("hasRemaining() in " + stream, !stream.hasRemaining()); + + // go the mark point + stream.reset(); + assertEquals('e', stream.read()); + + // when the stream is closed, the data should be returned + stream.close(); + assertEquals("outstanding buffers in " + factory, + 1, factory.getOutstandingBufferCount()); + block.close(); + assertEquals("outstanding buffers in " + factory, + 0, factory.getOutstandingBufferCount()); + stream.close(); + assertEquals("outstanding buffers in " + factory, + 0, factory.getOutstandingBufferCount()); + } + } + + @Test + public void testFastUploadArrayDisk() throws IOException { + testFastUploadFallback(FAST_UPLOAD_BUFFER_ARRAY_DISK); + } + + @Test + public void testFastUploadByteBufferDisk() throws IOException { + testFastUploadFallback(FAST_UPLOAD_BYTEBUFFER_DISK); + } + + private void testFastUploadFallback(String name) throws IOException { + Configuration conf = fs.getConf(); + fs.close(); + + conf.set(FAST_UPLOAD_BUFFER, name); + + fs = AliyunOSSTestUtils.createTestFileSystem(conf); + long size = 5 * MEMORY_LIMIT; + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); + OSSDataBlocks.MemoryBlockFactory + blockFactory = ((OSSDataBlocks.MemoryAndDiskBlockFactory) + ((AliyunOSSFileSystem)fs).getBlockFactory()).getMemoryFactory(); + assertEquals(blockFactory.getMemoryUsed(), 0); + + Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY)); + FileStatus[] files = bufferPath.getFileSystem( + fs.getConf()).listStatus(bufferPath); + // Temporary file should be deleted + assertEquals(0, files.length); + + BlockOutputStreamStatistics statistics = + ((AliyunOSSFileSystem)fs).getBlockOutputStreamStatistics(); + assertEquals(statistics.getBlocksAllocated(), + statistics.getBlocksReleased()); + assertTrue(statistics.getBlocksAllocated() > 1); + assertEquals(statistics.getBytesReleased(), + statistics.getBytesAllocated()); + assertTrue(statistics.getBytesAllocated() >= MEMORY_LIMIT); + assertTrue(statistics.getDiskBlocksAllocated() > 0); + assertEquals(statistics.getDiskBlocksAllocated(), + statistics.getDiskBlocksReleased()); + } }