Revert "HADOOP-15039. Move SemaphoredDelegatingExecutor to hadoop-common. Contributed by Genmao Yu"
This reverts commit 479d6a5792262c977025c26fd4960574b0db6847
This commit is contained in:
parent
438c1d333e
commit
28792b6b7f
@ -15,7 +15,8 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -41,7 +42,7 @@
|
|||||||
* this s4 threadpool</a>
|
* this s4 threadpool</a>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class BlockingThreadPoolExecutorService
|
final class BlockingThreadPoolExecutorService
|
||||||
extends SemaphoredDelegatingExecutor {
|
extends SemaphoredDelegatingExecutor {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
@ -85,7 +86,7 @@ public Thread newThread(Runnable r) {
|
|||||||
* @return a thread factory that creates named, daemon threads with
|
* @return a thread factory that creates named, daemon threads with
|
||||||
* the supplied exception handler and normal priority
|
* the supplied exception handler and normal priority
|
||||||
*/
|
*/
|
||||||
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||||
return new ThreadFactory() {
|
return new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -112,10 +112,8 @@
|
|||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
@ -42,13 +42,17 @@
|
|||||||
* This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
|
* This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
|
||||||
* contains the thread pool logic, whereas this isolates the semaphore
|
* contains the thread pool logic, whereas this isolates the semaphore
|
||||||
* and submit logic for use with other thread pools and delegation models.
|
* and submit logic for use with other thread pools and delegation models.
|
||||||
|
* In particular, it <i>permits multiple per stream executors to share a
|
||||||
|
* single per-FS-instance executor; the latter to throttle overall
|
||||||
|
* load from the the FS, the others to limit the amount of load which
|
||||||
|
* a single output stream can generate.</i>
|
||||||
* <p>
|
* <p>
|
||||||
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
|
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
|
||||||
* this s4 threadpool</a>
|
* this s4 threadpool</a>
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("NullableProblems")
|
@SuppressWarnings("NullableProblems")
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SemaphoredDelegatingExecutor extends
|
class SemaphoredDelegatingExecutor extends
|
||||||
ForwardingListeningExecutorService {
|
ForwardingListeningExecutorService {
|
||||||
|
|
||||||
private final Semaphore queueingPermits;
|
private final Semaphore queueingPermits;
|
||||||
@ -61,8 +65,7 @@ public class SemaphoredDelegatingExecutor extends
|
|||||||
* @param permitCount number of permits into the queue permitted
|
* @param permitCount number of permits into the queue permitted
|
||||||
* @param fair should the semaphore be "fair"
|
* @param fair should the semaphore be "fair"
|
||||||
*/
|
*/
|
||||||
public SemaphoredDelegatingExecutor(
|
SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
|
||||||
ListeningExecutorService executorDelegatee,
|
|
||||||
int permitCount,
|
int permitCount,
|
||||||
boolean fair) {
|
boolean fair) {
|
||||||
this.permitCount = permitCount;
|
this.permitCount = permitCount;
|
||||||
@ -187,7 +190,7 @@ class RunnableWithPermitRelease implements Runnable {
|
|||||||
|
|
||||||
private Runnable delegatee;
|
private Runnable delegatee;
|
||||||
|
|
||||||
RunnableWithPermitRelease(Runnable delegatee) {
|
public RunnableWithPermitRelease(Runnable delegatee) {
|
||||||
this.delegatee = delegatee;
|
this.delegatee = delegatee;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,7 +212,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
|
|||||||
|
|
||||||
private Callable<T> delegatee;
|
private Callable<T> delegatee;
|
||||||
|
|
||||||
CallableWithPermitRelease(Callable<T> delegatee) {
|
public CallableWithPermitRelease(Callable<T> delegatee) {
|
||||||
this.delegatee = delegatee;
|
this.delegatee = delegatee;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,8 +19,6 @@
|
|||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
|
||||||
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
|
||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
Loading…
Reference in New Issue
Block a user