HADOOP-11446 S3AOutputStream should use shared thread pool to avoid OutOfMemoryError

This commit is contained in:
Steve Loughran 2015-01-05 12:59:48 +00:00
parent 21c6f01831
commit 27d8395867
4 changed files with 109 additions and 15 deletions

View File

@ -663,6 +663,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11039. ByteBufferReadable API doc is inconsistent with the HADOOP-11039. ByteBufferReadable API doc is inconsistent with the
implementations. (Yi Liu via Colin P. McCabe) implementations. (Yi Liu via Colin P. McCabe)
HADOOP-11446. S3AOutputStream should use shared thread pool to
avoid OutOfMemoryError. (Ted Yu via stevel)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -41,6 +41,23 @@ public class Constants {
public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final int DEFAULT_MAX_PAGING_KEYS = 5000; public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
// the maximum number of threads to allow in the pool used by TransferManager
public static final String MAX_THREADS = "fs.s3a.threads.max";
public static final int DEFAULT_MAX_THREADS = 256;
// the number of threads to keep in the pool used by TransferManager
public static final String CORE_THREADS = "fs.s3a.threads.core";
public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
// when the number of threads is greater than the core, this is the maximum time
// that excess idle threads will wait for new tasks before terminating.
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;
// the maximum number of tasks that the LinkedBlockingQueue can hold
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
// size of each of or multipart pieces in bytes // size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB

View File

@ -26,6 +26,11 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.s3.S3Credentials; import org.apache.hadoop.fs.s3.S3Credentials;
@ -77,6 +82,7 @@ public class S3AFileSystem extends FileSystem {
private String bucket; private String bucket;
private int maxKeys; private int maxKeys;
private long partSize; private long partSize;
private TransferManager transfers;
private int partSizeThreshold; private int partSizeThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL; private CannedAccessControlList cannedACL;
@ -85,6 +91,55 @@ public class S3AFileSystem extends FileSystem {
// The maximum number of entries that can be deleted in any call to s3 // The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000; private static final int MAX_ENTRIES_TO_DELETE = 1000;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNum = poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}
/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
* @param prefix name prefix for all threads created from the factory
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
private static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
};
}
/** Called after a new FileSystem instance is constructed. /** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc. * @param name a uri whose authority section names the host, port, etc.
* for this FileSystem * for this FileSystem
@ -93,7 +148,6 @@ public class S3AFileSystem extends FileSystem {
public void initialize(URI name, Configuration conf) throws IOException { public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf); super.initialize(name, conf);
uri = URI.create(name.getScheme() + "://" + name.getAuthority()); uri = URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
this.getWorkingDirectory()); this.getWorkingDirectory());
@ -138,6 +192,34 @@ public void initialize(URI name, Configuration conf) throws IOException {
partSizeThreshold = 5 * 1024 * 1024; partSizeThreshold = 5 * 1024 * 1024;
} }
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
newDaemonThreadFactory("s3a-transfer-shared-"));
tpe.allowCoreThreadTimeOut(true);
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
transfers = new TransferManager(s3, tpe);
transfers.setConfiguration(transferConfiguration);
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) { if (!cannedACLName.isEmpty()) {
cannedACL = CannedAccessControlList.valueOf(cannedACLName); cannedACL = CannedAccessControlList.valueOf(cannedACLName);
@ -155,11 +237,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
DEFAULT_PURGE_EXISTING_MULTIPART_AGE); DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
if (purgeExistingMultipart) { if (purgeExistingMultipart) {
TransferManager transferManager = new TransferManager(s3);
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
transferManager.abortMultipartUploads(bucket, purgeBefore); transfers.abortMultipartUploads(bucket, purgeBefore);
transferManager.shutdownNow(false); transfers.shutdownNow(false);
} }
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
@ -245,7 +326,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwr
} }
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
bucket, key, progress, cannedACL, statistics, bucket, key, progress, cannedACL, statistics,
serverSideEncryptionAlgorithm), null); serverSideEncryptionAlgorithm), null);
} }

View File

@ -49,7 +49,7 @@ public class S3AOutputStream extends OutputStream {
private boolean closed; private boolean closed;
private String key; private String key;
private String bucket; private String bucket;
private AmazonS3Client client; private TransferManager transfers;
private Progressable progress; private Progressable progress;
private long partSize; private long partSize;
private int partSizeThreshold; private int partSizeThreshold;
@ -61,14 +61,14 @@ public class S3AOutputStream extends OutputStream {
public static final Logger LOG = S3AFileSystem.LOG; public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, AmazonS3Client client, public S3AOutputStream(Configuration conf, TransferManager transfers,
S3AFileSystem fs, String bucket, String key, Progressable progress, S3AFileSystem fs, String bucket, String key, Progressable progress,
CannedAccessControlList cannedACL, FileSystem.Statistics statistics, CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
String serverSideEncryptionAlgorithm) String serverSideEncryptionAlgorithm)
throws IOException { throws IOException {
this.bucket = bucket; this.bucket = bucket;
this.key = key; this.key = key;
this.client = client; this.transfers = transfers;
this.progress = progress; this.progress = progress;
this.fs = fs; this.fs = fs;
this.cannedACL = cannedACL; this.cannedACL = cannedACL;
@ -114,13 +114,6 @@ public synchronized void close() throws IOException {
try { try {
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
TransferManager transfers = new TransferManager(client);
transfers.setConfiguration(transferConfiguration);
final ObjectMetadata om = new ObjectMetadata(); final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setServerSideEncryption(serverSideEncryptionAlgorithm); om.setServerSideEncryption(serverSideEncryptionAlgorithm);