diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java index 12d551bbc2..0a833b2292 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java @@ -33,7 +33,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -50,7 +52,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream { private boolean closed; private String key; private File blockFile; - private List blockFiles = new ArrayList<>(); + private Map blockFiles = new HashMap<>(); private long blockSize; private int blockId = 0; private long blockWritten = 0L; @@ -94,8 +96,9 @@ public synchronized void close() throws IOException { blockStream.flush(); blockStream.close(); - if (!blockFiles.contains(blockFile)) { - blockFiles.add(blockFile); + if (!blockFiles.values().contains(blockFile)) { + blockId++; + blockFiles.put(blockId, blockFile); } try { @@ -107,7 +110,7 @@ public synchronized void close() throws IOException { ListenableFuture partETagFuture = executorService.submit(() -> { PartETag partETag = store.uploadPart(blockFile, key, uploadId, - blockId + 1); + blockId); return partETag; }); partETagsFutures.add(partETagFuture); @@ -120,11 +123,7 @@ public synchronized void close() throws IOException { store.completeMultipartUpload(key, uploadId, partETags); } } finally { - for (File tFile: blockFiles) { - if (tFile.exists() && !tFile.delete()) { - LOG.warn("Failed to delete temporary file {}", tFile); - } - } + removePartFiles(); closed = true; } } @@ -141,38 +140,52 @@ public synchronized void write(byte[] b, int off, int len) if (closed) { throw new IOException("Stream closed."); } - try { - blockStream.write(b, off, len); - blockWritten += len; - if (blockWritten >= blockSize) { - uploadCurrentPart(); - blockWritten = 0L; + blockStream.write(b, off, len); + blockWritten += len; + if (blockWritten >= blockSize) { + uploadCurrentPart(); + blockWritten = 0L; + } + } + + private void removePartFiles() throws IOException { + for (ListenableFuture partETagFuture : partETagsFutures) { + if (!partETagFuture.isDone()) { + continue; } - } finally { - for (File tFile: blockFiles) { - if (tFile.exists() && !tFile.delete()) { - LOG.warn("Failed to delete temporary file {}", tFile); + + try { + File blockFile = blockFiles.get(partETagFuture.get().getPartNumber()); + if (blockFile != null && blockFile.exists() && !blockFile.delete()) { + LOG.warn("Failed to delete temporary file {}", blockFile); } + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); } } } private void uploadCurrentPart() throws IOException { - blockFiles.add(blockFile); blockStream.flush(); blockStream.close(); if (blockId == 0) { uploadId = store.getUploadId(key); } + + blockId++; + blockFiles.put(blockId, blockFile); + + File currentFile = blockFile; + int currentBlockId = blockId; ListenableFuture partETagFuture = executorService.submit(() -> { - PartETag partETag = store.uploadPart(blockFile, key, uploadId, - blockId + 1); + PartETag partETag = store.uploadPart(currentFile, key, uploadId, + currentBlockId); return partETag; }); partETagsFutures.add(partETagFuture); + removePartFiles(); blockFile = newBlockFile(); - blockId++; blockStream = new BufferedOutputStream(new FileOutputStream(blockFile)); } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 5e2175926a..dc5f99eece 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -450,6 +450,8 @@ public InputStream retrieve(String key, long byteStart, long byteEnd) { request.setRange(byteStart, byteEnd); return ossClient.getObject(request).getObjectContent(); } catch (OSSException | ClientException e) { + LOG.error("Exception thrown when store retrieves key: " + + key + ", exception: " + e); return null; } } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 365d93142a..6fe6f03107 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -31,6 +31,7 @@ import java.io.IOException; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; /** * Tests regular and multi-part upload functionality for @@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream { public void setUp() throws Exception { Configuration conf = new Configuration(); conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); - conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); + conf.setInt(IO_CHUNK_BUFFER_SIZE, + conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); + conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); fs = AliyunOSSTestUtils.createTestFileSystem(conf); } @@ -84,6 +88,12 @@ public void testMultiPartUpload() throws IOException { 6 * 1024 * 1024 + 1); } + @Test + public void testMultiPartUploadConcurrent() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + 50 * 1024 * 1024 - 1); + } + @Test public void testHugeUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(),