From 9317a61f3cdc5ca91c6934eec9898cee3d65441a Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Thu, 8 Nov 2018 23:41:43 +0800 Subject: [PATCH] HDDS-802. Container State Manager should get open pipelines for allocating container. Contributed by Lokesh Jain. --- .../scm/container/ContainerStateManager.java | 4 +- .../hdds/scm/pipeline/PipelineManager.java | 3 + .../scm/pipeline/PipelineStateManager.java | 5 ++ .../hdds/scm/pipeline/PipelineStateMap.java | 22 +++++++ .../hdds/scm/pipeline/SCMPipelineManager.java | 11 ++++ .../pipeline/TestPipelineStateManager.java | 61 +++++++++++++++++-- 6 files changed, 100 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 87505c3006..74c8dcba2b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -248,8 +248,8 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager, try { pipeline = pipelineManager.createPipeline(type, replicationFactor); } catch (IOException e) { - final List pipelines = - pipelineManager.getPipelines(type, replicationFactor); + final List pipelines = pipelineManager + .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN); if (pipelines.isEmpty()) { throw new IOException("Could not allocate container"); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 04ec535900..cce09f3dd6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -46,6 +46,9 @@ Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, List getPipelines(ReplicationType type, ReplicationFactor factor); + List getPipelines(ReplicationType type, + ReplicationFactor factor, Pipeline.PipelineState state); + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 67f74d309d..9f95378882 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -64,6 +64,11 @@ List getPipelines(ReplicationType type, ReplicationFactor factor) { return pipelineStateMap.getPipelines(type, factor); } + List getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state) { + return pipelineStateMap.getPipelines(type, factor, state); + } + List getPipelines(ReplicationType type, PipelineState... states) { return pipelineStateMap.getPipelines(type, states); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 7b69491319..85790b2ee6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -165,6 +165,28 @@ List getPipelines(ReplicationType type, PipelineState... states) { .collect(Collectors.toList()); } + /** + * Get list of pipeline corresponding to specified replication type, + * replication factor and pipeline state. + * + * @param type - ReplicationType + * @param state - Required PipelineState + * @return List of pipelines with specified replication type, + * replication factor and pipeline state + */ + List getPipelines(ReplicationType type, ReplicationFactor factor, + PipelineState state) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + Preconditions.checkNotNull(factor, "Replication factor cannot be null"); + Preconditions.checkNotNull(state, "Pipeline state cannot be null"); + + return pipelineMap.values().stream().filter( + pipeline -> pipeline.getType() == type + && pipeline.getPipelineState() == state + && pipeline.getFactor() == factor) + .collect(Collectors.toList()); + } + /** * Get set of containerIDs corresponding to a pipeline. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 1c217482c6..5e8d0dc2f7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -165,6 +165,17 @@ public List getPipelines(ReplicationType type, } } + @Override + public List getPipelines(ReplicationType type, + ReplicationFactor factor, Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, factor, state); + } finally { + lock.readLock().unlock(); + } + } + @Override public void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index fd6f76b88d..0f5692e7c7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -157,8 +157,8 @@ public void testGetPipelinesByTypeAndFactor() throws IOException { stateManager.getPipelines(type, factor); Assert.assertEquals(15, pipelines1.size()); pipelines1.stream().forEach(p -> { - Assert.assertEquals(p.getType(), type); - Assert.assertEquals(p.getFactor(), factor); + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(factor, p.getFactor()); }); } } @@ -203,8 +203,8 @@ public void testGetPipelinesByTypeAndState() throws IOException { .getPipelines(type, Pipeline.PipelineState.OPEN); Assert.assertEquals(5, pipelines1.size()); pipelines1.forEach(p -> { - Assert.assertEquals(p.getType(), type); - Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN); + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(Pipeline.PipelineState.OPEN, p.getPipelineState()); }); pipelines1 = stateManager @@ -219,6 +219,59 @@ public void testGetPipelinesByTypeAndState() throws IOException { } } + @Test + public void testGetPipelinesByTypeFactorAndState() throws IOException { + Set pipelines = new HashSet<>(); + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + for (int i = 0; i < 5; i++) { + // 5 pipelines in allocated state for each type and factor + Pipeline pipeline = + createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + pipelines.add(pipeline); + + // 5 pipelines in open state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getId()); + pipelines.add(pipeline); + + // 5 pipelines in closed state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.finalizePipeline(pipeline.getId()); + pipelines.add(pipeline); + } + } + } + + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor + .values()) { + for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { + // verify pipelines received + List pipelines1 = + stateManager.getPipelines(type, factor, state); + Assert.assertEquals(5, pipelines1.size()); + pipelines1.forEach(p -> { + Assert.assertEquals(type, p.getType()); + Assert.assertEquals(factor, p.getFactor()); + Assert.assertEquals(state, p.getPipelineState()); + }); + } + } + } + + //clean up + for (Pipeline pipeline : pipelines) { + removePipeline(pipeline); + } + } + @Test public void testAddAndGetContainer() throws IOException { long containerID = 0;