HADOOP-16306. AliyunOSS: Remove temporary files when upload small files to OSS. Contributed by wujinhu.

This commit is contained in:
Weiwei Yang 2019-05-14 14:05:39 -07:00
parent e2dfdaee7c
commit 2d8282bb82
2 changed files with 39 additions and 3 deletions

View File

@ -124,7 +124,7 @@ public synchronized void close() throws IOException {
new ArrayList<>(partETags)); new ArrayList<>(partETags));
} }
} finally { } finally {
removePartFiles(); removeTemporaryFiles();
closed = true; closed = true;
} }
} }
@ -149,6 +149,14 @@ public synchronized void write(byte[] b, int off, int len)
} }
} }
private void removeTemporaryFiles() {
for (File file : blockFiles.values()) {
if (file != null && file.exists() && !file.delete()) {
LOG.warn("Failed to delete temporary file {}", file);
}
}
}
private void removePartFiles() throws IOException { private void removePartFiles() throws IOException {
for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) { for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
if (!partETagFuture.isDone()) { if (!partETagFuture.isDone()) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.aliyun.oss; package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -30,7 +31,9 @@
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -49,9 +52,9 @@ public class TestAliyunOSSBlockOutputStream {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
conf.setInt(IO_CHUNK_BUFFER_SIZE, conf.setInt(IO_CHUNK_BUFFER_SIZE,
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
fs = AliyunOSSTestUtils.createTestFileSystem(conf); fs = AliyunOSSTestUtils.createTestFileSystem(conf);
} }
@ -70,6 +73,7 @@ private Path getTestPath() {
@Test @Test
public void testZeroByteUpload() throws IOException { public void testZeroByteUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
bufferDirShouldEmpty();
} }
@Test @Test
@ -106,6 +110,7 @@ public void testRegularUpload() throws IOException {
assertEquals(3 * size, statistics.getBytesRead()); assertEquals(3 * size, statistics.getBytesRead());
assertEquals(10, statistics.getWriteOps()); assertEquals(10, statistics.getWriteOps());
assertEquals(3 * size, statistics.getBytesWritten()); assertEquals(3 * size, statistics.getBytesWritten());
bufferDirShouldEmpty();
} }
@Test @Test
@ -131,6 +136,7 @@ public void testMultiPartUpload() throws IOException {
assertEquals(3 * size, statistics.getBytesRead()); assertEquals(3 * size, statistics.getBytesRead());
assertEquals(25, statistics.getWriteOps()); assertEquals(25, statistics.getWriteOps());
assertEquals(3 * size, statistics.getBytesWritten()); assertEquals(3 * size, statistics.getBytesWritten());
bufferDirShouldEmpty();
} }
@Test @Test
@ -144,6 +150,7 @@ public void testMultiPartUploadConcurrent() throws IOException {
assertEquals(size, statistics.getBytesRead()); assertEquals(size, statistics.getBytesRead());
assertEquals(52, statistics.getWriteOps()); assertEquals(52, statistics.getWriteOps());
assertEquals(size, statistics.getBytesWritten()); assertEquals(size, statistics.getBytesWritten());
bufferDirShouldEmpty();
} }
@Test @Test
@ -154,6 +161,7 @@ public void testHugeUpload() throws IOException {
MULTIPART_UPLOAD_PART_SIZE_DEFAULT); MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1); MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
bufferDirShouldEmpty();
} }
@Test @Test
@ -174,4 +182,24 @@ public void testMultiPartUploadLimit() throws IOException {
assert(10001 * 100 * 1024 / partSize4 assert(10001 * 100 * 1024 / partSize4
< Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
} }
@Test
/**
* This test is used to verify HADOOP-16306.
* Test small file uploading so that oss fs will upload file directly
* instead of multi part upload.
*/
public void testSmallUpload() throws IOException {
long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
bufferDirShouldEmpty();
}
private void bufferDirShouldEmpty() throws IOException {
Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
FileStatus[] files = bufferPath.getFileSystem(
fs.getConf()).listStatus(bufferPath);
// Temporary file should be deleted
assertEquals(0, files.length);
}
} }