diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index 3b2f5d5f26..1ef7fe72ae 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -23,6 +23,7 @@ import java.net.URI; import com.aliyun.oss.common.auth.CredentialsProvider; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -39,7 +40,7 @@ final public class AliyunOSSUtils { private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSUtils.class); - private static LocalDirAllocator directoryAllocator; + private static volatile LocalDirAllocator directoryAllocator; private AliyunOSSUtils() { } @@ -171,21 +172,33 @@ public static boolean objectRepresentsDirectory(final String name, /** * Demand create the directory allocator, then create a temporary file. - * @param path prefix for the temporary file - * @param size the size of the file that is going to be written - * @param conf the Configuration object - * @return a unique temporary file - * @throws IOException IO problems + * This does not mark the file for deletion when a process exits. + * {@link LocalDirAllocator#createTmpFileForWrite( + * String, long, Configuration)}. + * @param pathStr prefix for the temporary file + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return a unique temporary file + * @throws IOException IO problems */ - public static File createTmpFileForWrite(String path, long size, + public static File createTmpFileForWrite(String pathStr, long size, Configuration conf) throws IOException { if (conf.get(BUFFER_DIR_KEY) == null) { conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); } if (directoryAllocator == null) { - directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY); + synchronized (AliyunOSSUtils.class) { + if (directoryAllocator == null) { + directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY); + } + } } - return directoryAllocator.createTmpFileForWrite(path, size, conf); + Path path = directoryAllocator.getLocalPathForWrite(pathStr, + size, conf); + File dir = new File(path.getParent().toUri().getPath()); + String prefix = path.getName(); + // create a temp file on this directory + return File.createTempFile(prefix, null, dir); } /** diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index f6e0b7731d..69aa0a5a79 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -23,19 +23,27 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.LinkedHashSet; import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * Tests regular and multi-part upload functionality for @@ -201,4 +209,44 @@ private void bufferDirShouldEmpty() throws IOException { // Temporary file should be deleted assertEquals(0, files.length); } + + @Test + public void testDirectoryAllocator() throws Throwable { + Configuration conf = fs.getConf(); + File tmp = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf); + assertTrue("not found: " + tmp, tmp.exists()); + tmp.delete(); + + // tmp should not in DeleteOnExitHook + try { + Class c = Class.forName("java.io.DeleteOnExitHook"); + Field field = c.getDeclaredField("files"); + field.setAccessible(true); + String name = field.getName(); + LinkedHashSet files = (LinkedHashSet)field.get(name); + assertTrue("in DeleteOnExitHook", files.isEmpty()); + assertFalse("in DeleteOnExitHook", + (new ArrayList<>(files)).contains(tmp.getPath())); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testDirectoryAllocatorRR() throws Throwable { + File dir1 = GenericTestUtils.getRandomizedTestDir(); + File dir2 = GenericTestUtils.getRandomizedTestDir(); + dir1.mkdirs(); + dir2.mkdirs(); + + Configuration conf = new Configuration(); + conf.set(BUFFER_DIR_KEY, dir1 + ", " + dir2); + fs = AliyunOSSTestUtils.createTestFileSystem(conf); + File tmp1 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf); + tmp1.delete(); + File tmp2 = AliyunOSSUtils.createTmpFileForWrite("out-", 1024, conf); + tmp2.delete(); + assertNotEquals("round robin not working", + tmp1.getParent(), tmp2.getParent()); + } }