diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index 404eea9618..f13942d382 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.util; + +package org.apache.hadoop.fs.s3a; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -41,7 +42,7 @@ * this s4 threadpool */ @InterfaceAudience.Private -public final class BlockingThreadPoolExecutorService +final class BlockingThreadPoolExecutorService extends SemaphoredDelegatingExecutor { private static final Logger LOG = LoggerFactory @@ -85,7 +86,7 @@ public Thread newThread(Runnable r) { * @return a thread factory that creates named, daemon threads with * the supplied exception handler and normal priority */ - public static ThreadFactory newDaemonThreadFactory(final String prefix) { + static ThreadFactory newDaemonThreadFactory(final String prefix) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e9277586e4..63a43492b4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -112,10 +112,8 @@ import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java index bcc19e35e8..6b21912871 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.util; +package org.apache.hadoop.fs.s3a; import com.google.common.util.concurrent.ForwardingListeningExecutorService; import com.google.common.util.concurrent.Futures; @@ -42,13 +42,17 @@ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code * contains the thread pool logic, whereas this isolates the semaphore * and submit logic for use with other thread pools and delegation models. + * In particular, it permits multiple per stream executors to share a + * single per-FS-instance executor; the latter to throttle overall + * load from the the FS, the others to limit the amount of load which + * a single output stream can generate. *
* This is inspired by
* this s4 threadpool
*/
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
-public class SemaphoredDelegatingExecutor extends
+class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
private final Semaphore queueingPermits;
@@ -61,8 +65,7 @@ public class SemaphoredDelegatingExecutor extends
* @param permitCount number of permits into the queue permitted
* @param fair should the semaphore be "fair"
*/
- public SemaphoredDelegatingExecutor(
- ListeningExecutorService executorDelegatee,
+ SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this.permitCount = permitCount;
@@ -187,7 +190,7 @@ class RunnableWithPermitRelease implements Runnable {
private Runnable delegatee;
- RunnableWithPermitRelease(Runnable delegatee) {
+ public RunnableWithPermitRelease(Runnable delegatee) {
this.delegatee = delegatee;
}
@@ -209,7 +212,7 @@ class CallableWithPermitRelease