HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.

This commit is contained in:
Sammi Chen 2018-07-30 10:53:44 +08:00
parent 007e6f5113
commit 0857f116b7
3 changed files with 49 additions and 24 deletions

View File

@ -33,7 +33,9 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -50,7 +52,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
private boolean closed; private boolean closed;
private String key; private String key;
private File blockFile; private File blockFile;
private List<File> blockFiles = new ArrayList<>(); private Map<Integer, File> blockFiles = new HashMap<>();
private long blockSize; private long blockSize;
private int blockId = 0; private int blockId = 0;
private long blockWritten = 0L; private long blockWritten = 0L;
@ -94,8 +96,9 @@ public synchronized void close() throws IOException {
blockStream.flush(); blockStream.flush();
blockStream.close(); blockStream.close();
if (!blockFiles.contains(blockFile)) { if (!blockFiles.values().contains(blockFile)) {
blockFiles.add(blockFile); blockId++;
blockFiles.put(blockId, blockFile);
} }
try { try {
@ -107,7 +110,7 @@ public synchronized void close() throws IOException {
ListenableFuture<PartETag> partETagFuture = ListenableFuture<PartETag> partETagFuture =
executorService.submit(() -> { executorService.submit(() -> {
PartETag partETag = store.uploadPart(blockFile, key, uploadId, PartETag partETag = store.uploadPart(blockFile, key, uploadId,
blockId + 1); blockId);
return partETag; return partETag;
}); });
partETagsFutures.add(partETagFuture); partETagsFutures.add(partETagFuture);
@ -120,11 +123,7 @@ public synchronized void close() throws IOException {
store.completeMultipartUpload(key, uploadId, partETags); store.completeMultipartUpload(key, uploadId, partETags);
} }
} finally { } finally {
for (File tFile: blockFiles) { removePartFiles();
if (tFile.exists() && !tFile.delete()) {
LOG.warn("Failed to delete temporary file {}", tFile);
}
}
closed = true; closed = true;
} }
} }
@ -141,38 +140,52 @@ public synchronized void write(byte[] b, int off, int len)
if (closed) { if (closed) {
throw new IOException("Stream closed."); throw new IOException("Stream closed.");
} }
try {
blockStream.write(b, off, len); blockStream.write(b, off, len);
blockWritten += len; blockWritten += len;
if (blockWritten >= blockSize) { if (blockWritten >= blockSize) {
uploadCurrentPart(); uploadCurrentPart();
blockWritten = 0L; blockWritten = 0L;
} }
} finally {
for (File tFile: blockFiles) {
if (tFile.exists() && !tFile.delete()) {
LOG.warn("Failed to delete temporary file {}", tFile);
} }
private void removePartFiles() throws IOException {
for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
if (!partETagFuture.isDone()) {
continue;
}
try {
File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
LOG.warn("Failed to delete temporary file {}", blockFile);
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
} }
} }
} }
private void uploadCurrentPart() throws IOException { private void uploadCurrentPart() throws IOException {
blockFiles.add(blockFile);
blockStream.flush(); blockStream.flush();
blockStream.close(); blockStream.close();
if (blockId == 0) { if (blockId == 0) {
uploadId = store.getUploadId(key); uploadId = store.getUploadId(key);
} }
blockId++;
blockFiles.put(blockId, blockFile);
File currentFile = blockFile;
int currentBlockId = blockId;
ListenableFuture<PartETag> partETagFuture = ListenableFuture<PartETag> partETagFuture =
executorService.submit(() -> { executorService.submit(() -> {
PartETag partETag = store.uploadPart(blockFile, key, uploadId, PartETag partETag = store.uploadPart(currentFile, key, uploadId,
blockId + 1); currentBlockId);
return partETag; return partETag;
}); });
partETagsFutures.add(partETagFuture); partETagsFutures.add(partETagFuture);
removePartFiles();
blockFile = newBlockFile(); blockFile = newBlockFile();
blockId++;
blockStream = new BufferedOutputStream(new FileOutputStream(blockFile)); blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
} }

View File

@ -450,6 +450,8 @@ public InputStream retrieve(String key, long byteStart, long byteEnd) {
request.setRange(byteStart, byteEnd); request.setRange(byteStart, byteEnd);
return ossClient.getObject(request).getObjectContent(); return ossClient.getObject(request).getObjectContent();
} catch (OSSException | ClientException e) { } catch (OSSException | ClientException e) {
LOG.error("Exception thrown when store retrieves key: "
+ key + ", exception: " + e);
return null; return null;
} }
} }

View File

@ -31,6 +31,7 @@
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
/** /**
* Tests regular and multi-part upload functionality for * Tests regular and multi-part upload functionality for
@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream {
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024); conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
conf.setInt(IO_CHUNK_BUFFER_SIZE,
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
fs = AliyunOSSTestUtils.createTestFileSystem(conf); fs = AliyunOSSTestUtils.createTestFileSystem(conf);
} }
@ -84,6 +88,12 @@ public void testMultiPartUpload() throws IOException {
6 * 1024 * 1024 + 1); 6 * 1024 * 1024 + 1);
} }
@Test
public void testMultiPartUploadConcurrent() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
50 * 1024 * 1024 - 1);
}
@Test @Test
public void testHugeUpload() throws IOException { public void testHugeUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), ContractTestUtils.createAndVerifyFile(fs, getTestPath(),