HDDS-1106. Introduce queryMap in PipelineManager. Contributed by Lokesh Jain.

This commit is contained in:
Yiqun Lin 2019-02-18 22:35:23 +08:00
parent db4d1a1e2f
commit f2fb6536dc
2 changed files with 111 additions and 3 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline; package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
@ -27,6 +28,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -42,15 +44,27 @@ class PipelineStateMap {
private final Map<PipelineID, Pipeline> pipelineMap; private final Map<PipelineID, Pipeline> pipelineMap;
private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container; private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container;
private final Map<PipelineQuery, List<Pipeline>> query2OpenPipelines;
PipelineStateMap() { PipelineStateMap() {
// TODO: Use TreeMap for range operations? // TODO: Use TreeMap for range operations?
this.pipelineMap = new HashMap<>(); pipelineMap = new HashMap<>();
this.pipeline2container = new HashMap<>(); pipeline2container = new HashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
} }
private void initializeQueryMap() {
for (ReplicationType type : ReplicationType.values()) {
for (ReplicationFactor factor : ReplicationFactor.values()) {
query2OpenPipelines
.put(new PipelineQuery(type, factor), new CopyOnWriteArrayList<>());
}
}
}
/** /**
* Adds provided pipeline in the data structures. * Adds provided pipeline in the data structures.
* *
@ -70,6 +84,9 @@ void addPipeline(Pipeline pipeline) throws IOException {
.format("Duplicate pipeline ID %s detected.", pipeline.getId())); .format("Duplicate pipeline ID %s detected.", pipeline.getId()));
} }
pipeline2container.put(pipeline.getId(), new TreeSet<>()); pipeline2container.put(pipeline.getId(), new TreeSet<>());
if (pipeline.getPipelineState() == PipelineState.OPEN) {
query2OpenPipelines.get(new PipelineQuery(pipeline)).add(pipeline);
}
} }
/** /**
@ -188,6 +205,10 @@ List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
Preconditions.checkNotNull(factor, "Replication factor cannot be null"); Preconditions.checkNotNull(factor, "Replication factor cannot be null");
Preconditions.checkNotNull(state, "Pipeline state cannot be null"); Preconditions.checkNotNull(state, "Pipeline state cannot be null");
if (state == PipelineState.OPEN) {
return Collections.unmodifiableList(
query2OpenPipelines.get(new PipelineQuery(type, factor)));
}
return pipelineMap.values().stream().filter( return pipelineMap.values().stream().filter(
pipeline -> pipeline.getType() == type pipeline -> pipeline.getType() == type
&& pipeline.getPipelineState() == state && pipeline.getPipelineState() == state
@ -293,7 +314,52 @@ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
final Pipeline pipeline = getPipeline(pipelineID); final Pipeline pipeline = getPipeline(pipelineID);
return pipelineMap.compute(pipelineID, Pipeline updatedPipeline = pipelineMap.compute(pipelineID,
(id, p) -> Pipeline.newBuilder(pipeline).setState(state).build()); (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build());
PipelineQuery query = new PipelineQuery(pipeline);
if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
// for transition to OPEN state add pipeline to query2OpenPipelines
query2OpenPipelines.get(query).add(updatedPipeline);
} else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) {
// for transition from OPEN to CLOSED state remove pipeline from
// query2OpenPipelines
query2OpenPipelines.get(query).remove(pipeline);
}
return updatedPipeline;
}
private class PipelineQuery {
private ReplicationType type;
private ReplicationFactor factor;
PipelineQuery(ReplicationType type, ReplicationFactor factor) {
this.type = type;
this.factor = factor;
}
PipelineQuery(Pipeline pipeline) {
type = pipeline.getType();
factor = pipeline.getFactor();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!this.getClass().equals(other.getClass())) {
return false;
}
PipelineQuery otherQuery = (PipelineQuery) other;
return type == otherQuery.type && factor == otherQuery.factor;
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(type)
.append(factor)
.toHashCode();
}
} }
} }

View File

@ -419,6 +419,48 @@ public void testOpenPipeline() throws IOException {
removePipeline(pipeline); removePipeline(pipeline);
} }
@Test
public void testQueryPipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, 3);
// pipeline in allocated state should not be reported
stateManager.addPipeline(pipeline);
Assert.assertEquals(0, stateManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.size());
// pipeline in open state should be reported
stateManager.openPipeline(pipeline.getId());
Assert.assertEquals(1, stateManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.size());
Pipeline pipeline2 = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, 3);
pipeline2 = Pipeline.newBuilder(pipeline2)
.setState(Pipeline.PipelineState.OPEN)
.build();
// pipeline in open state should be reported
stateManager.addPipeline(pipeline2);
Assert.assertEquals(2, stateManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.size());
// pipeline in closed state should not be reported
stateManager.finalizePipeline(pipeline2.getId());
Assert.assertEquals(1, stateManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
.size());
// clean up
removePipeline(pipeline);
removePipeline(pipeline2);
}
private void removePipeline(Pipeline pipeline) throws IOException { private void removePipeline(Pipeline pipeline) throws IOException {
stateManager.finalizePipeline(pipeline.getId()); stateManager.finalizePipeline(pipeline.getId());
stateManager.removePipeline(pipeline.getId()); stateManager.removePipeline(pipeline.getId());