MAPREDUCE-7437. MR Fetcher class to use an AtomicInteger to generate IDs. (#5579)

...as until now it wasn't thread safe

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2023-04-25 19:53:40 +01:00 committed by GitHub
parent 6aac6cb212
commit b6b9bd67bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -31,6 +31,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
@ -91,7 +92,7 @@ public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
protected final ShuffleClientMetrics metrics; protected final ShuffleClientMetrics metrics;
protected final ExceptionReporter exceptionReporter; protected final ExceptionReporter exceptionReporter;
protected final int id; protected final int id;
private static int nextId = 0; private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
protected final int reduce; protected final int reduce;
private final int connectionTimeout; private final int connectionTimeout;
@ -118,7 +119,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
Reporter reporter, ShuffleClientMetrics metrics, Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey) { ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this(job, reduceId, scheduler, merger, reporter, metrics, this(job, reduceId, scheduler, merger, reporter, metrics,
exceptionReporter, shuffleKey, ++nextId); exceptionReporter, shuffleKey, NEXT_ID.incrementAndGet());
} }
@VisibleForTesting @VisibleForTesting