diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index a37bf33bd9..cd3f423213 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -263,11 +263,15 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager, } pipeline = pipelines.get((int) containerCount.get() % pipelines.size()); } - return allocateContainer(pipelineManager, owner, pipeline); + synchronized (pipeline) { + return allocateContainer(pipelineManager, owner, pipeline); + } } /** * Allocates a new container based on the type, replication etc. + * This method should be called only after the lock on the pipeline is held + * on which the container will be allocated. * * @param pipelineManager - Pipeline Manager class. * @param owner - Owner of the container. @@ -296,10 +300,10 @@ ContainerInfo allocateContainer( .setReplicationFactor(pipeline.getFactor()) .setReplicationType(pipeline.getType()) .build(); - pipelineManager.addContainerToPipeline(pipeline.getId(), - ContainerID.valueof(containerID)); Preconditions.checkNotNull(containerInfo); containers.addContainer(containerInfo); + pipelineManager.addContainerToPipeline(pipeline.getId(), + ContainerID.valueof(containerID)); containerStateCount.incrementAndGet(containerInfo.getState()); LOG.trace("New container allocated: {}", containerInfo); return containerInfo; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 80d7ec10e0..359731cfe4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -386,18 +386,17 @@ public ContainerInfo getMatchingContainer(final long sizeRequired, public ContainerInfo getMatchingContainer(final long sizeRequired, String owner, Pipeline pipeline, List excludedContainers) { + NavigableSet containerIDs; try { - //TODO: #CLUTIL See if lock is required here - NavigableSet containerIDs = - pipelineManager.getContainersInPipeline(pipeline.getId()); + synchronized (pipeline) { + //TODO: #CLUTIL See if lock is required here + containerIDs = + pipelineManager.getContainersInPipeline(pipeline.getId()); - containerIDs = getContainersForOwner(containerIDs, owner); - if (containerIDs.size() < numContainerPerOwnerInPipeline) { - synchronized (pipeline) { + containerIDs = getContainersForOwner(containerIDs, owner); + if (containerIDs.size() < numContainerPerOwnerInPipeline) { // TODO: #CLUTIL Maybe we can add selection logic inside synchronized // as well - containerIDs = getContainersForOwner( - pipelineManager.getContainersInPipeline(pipeline.getId()), owner); if (containerIDs.size() < numContainerPerOwnerInPipeline) { ContainerInfo containerInfo = containerStateManager.allocateContainer(pipelineManager, owner, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 6a98a346f7..e5c4766697 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -19,7 +19,13 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; + import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -132,6 +138,43 @@ public void testAllocateBlock() throws Exception { Assert.assertNotNull(block); } + @Test + public void testAllocateBlockInParallel() throws Exception { + eventQueue.fireEvent(SCMEvents.SAFE_MODE_STATUS, safeModeStatus); + GenericTestUtils.waitFor(() -> { + return !blockManager.isScmInSafeMode(); + }, 10, 1000 * 5); + int threadCount = 20; + List executors = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + executors.add(Executors.newSingleThreadExecutor()); + } + List> futureList = + new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + final CompletableFuture future = + new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { + try { + future.complete(blockManager + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner, + new ExcludeList())); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + }, executors.get(i)); + futureList.add(future); + } + try { + CompletableFuture + .allOf(futureList.toArray(new CompletableFuture[futureList.size()])) + .get(); + } catch (Exception e) { + Assert.fail("testAllocateBlockInParallel failed"); + } + } + @Test public void testAllocateOversizedBlock() throws Exception { eventQueue.fireEvent(SCMEvents.SAFE_MODE_STATUS, safeModeStatus); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index a562efeab4..bfdeac5263 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -43,14 +43,20 @@ import java.io.File; import java.io.IOException; -import java.util.Iterator; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.Iterator; +import java.util.Optional; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -144,6 +150,43 @@ public void testallocateContainerDistributesAllocation() throws Exception { Assert.assertTrue(pipelineList.size() > 5); } + @Test + public void testAllocateContainerInParallel() throws Exception { + int threadCount = 20; + List executors = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + executors.add(Executors.newSingleThreadExecutor()); + } + List> futureList = + new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + final CompletableFuture future = new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { + try { + ContainerInfo containerInfo = containerManager + .allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + + Assert.assertNotNull(containerInfo); + Assert.assertNotNull(containerInfo.getPipelineID()); + future.complete(containerInfo); + return containerInfo; + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + }, executors.get(i)); + futureList.add(future); + } + try { + CompletableFuture + .allOf(futureList.toArray(new CompletableFuture[futureList.size()])) + .get(); + } catch (Exception e) { + Assert.fail("testAllocateBlockInParallel failed"); + } + } + @Test public void testGetContainer() throws IOException { ContainerInfo containerInfo = containerManager.allocateContainer(