HDFS-12479. Some misuses of lock in DFSStripedOutputStream. Contributed by Huafeng Wang

This commit is contained in:
Kai Zheng 2017-09-19 17:45:41 +08:00
parent 2018538fdb
commit dba7a7dd9d

View File

@ -63,6 +63,7 @@
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -85,11 +86,10 @@ static class MultipleBlockingQueue<T> {
private final List<BlockingQueue<T>> queues; private final List<BlockingQueue<T>> queues;
MultipleBlockingQueue(int numQueue, int queueSize) { MultipleBlockingQueue(int numQueue, int queueSize) {
List<BlockingQueue<T>> list = new ArrayList<>(numQueue); queues = new ArrayList<>(numQueue);
for (int i = 0; i < numQueue; i++) { for (int i = 0; i < numQueue; i++) {
list.add(new LinkedBlockingQueue<T>(queueSize)); queues.add(new LinkedBlockingQueue<T>(queueSize));
} }
queues = Collections.synchronizedList(list);
} }
void offer(int i, T object) { void offer(int i, T object) {
@ -156,8 +156,7 @@ static class Coordinator {
followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
updateStreamerMap = Collections.synchronizedMap( updateStreamerMap = new ConcurrentHashMap<>(numAllBlocks);
new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
} }