HDDS-462. Optimize ContainerStateMap#getMatchingContainerIDs in SCM. Contributed by Nanda kumar.

This commit is contained in:
Nanda kumar 2018-09-15 23:11:39 +05:30
parent a65c3ea91c
commit c9fa081897
9 changed files with 224 additions and 50 deletions

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.commons.math3.util.MathUtils;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
@ -48,7 +50,6 @@ public ContainerID(long id) {
* @return ContainerID.
*/
public static ContainerID valueof(long containerID) {
Preconditions.checkState(containerID > 0);
return new ContainerID(containerID);
}
@ -66,28 +67,37 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerID that = (ContainerID) o;
return id == that.id;
return new EqualsBuilder()
.append(getId(), that.getId())
.isEquals();
}
@Override
public int hashCode() {
return MathUtils.hash(id);
return new HashCodeBuilder(61, 71)
.append(getId())
.toHashCode();
}
@Override
public int compareTo(Object o) {
Preconditions.checkNotNull(o);
if (o instanceof ContainerID) {
return Long.compare(((ContainerID) o).getId(), this.getId());
if(getClass() != o.getClass()) {
throw new ClassCastException("ContainerID class expected. found:" +
o.getClass().toString());
}
throw new IllegalArgumentException("Object O, should be an instance " +
"of ContainerID");
ContainerID that = (ContainerID) o;
return new CompareToBuilder()
.append(this.getId(), that.getId())
.build();
}
@Override

View File

@ -106,6 +106,13 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
this.replicationType = repType;
}
public ContainerInfo(ContainerInfo info) {
this(info.getContainerID(), info.getState(), info.getPipelineID(),
info.getAllocatedBytes(), info.getUsedBytes(), info.getNumberOfKeys(),
info.getStateEnterTime(), info.getOwner(),
info.getDeleteTransactionId(), info.getReplicationFactor(),
info.getReplicationType());
}
/**
* Needed for serialization findbugs.
*/
@ -238,7 +245,8 @@ public void setOwner(String owner) {
@Override
public String toString() {
return "ContainerInfo{"
+ "state=" + state
+ "id=" + containerID
+ ", state=" + state
+ ", pipelineID=" + pipelineID
+ ", stateEnterTime=" + stateEnterTime
+ ", owner=" + owner

View File

@ -440,7 +440,7 @@ private ContainerInfo findContainerWithSpace(long size,
NavigableSet<ContainerID> searchSet, String owner) {
// Get the container with space to meet our request.
for (ContainerID id : searchSet) {
ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();
@ -502,7 +502,7 @@ public ContainerWithPipeline getContainer(PipelineSelector selector,
* @throws IOException
*/
public ContainerInfo getContainer(ContainerID containerID) {
return containers.getContainerInfo(containerID.getId());
return containers.getContainerInfo(containerID);
}
@Override

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.hdds.scm.container.states;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
/**
* Key for the Caching layer for Container Query.
*/
public class ContainerQueryKey {
private final HddsProtos.LifeCycleState state;
private final String owner;
private final HddsProtos.ReplicationFactor factor;
private final HddsProtos.ReplicationType type;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerQueryKey that = (ContainerQueryKey) o;
return new EqualsBuilder()
.append(getState(), that.getState())
.append(getOwner(), that.getOwner())
.append(getFactor(), that.getFactor())
.append(getType(), that.getType())
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(61, 71)
.append(getState())
.append(getOwner())
.append(getFactor())
.append(getType())
.toHashCode();
}
/**
* Constructor for ContainerQueryKey.
* @param state LifeCycleState
* @param owner - Name of the Owner.
* @param factor Replication Factor.
* @param type - Replication Type.
*/
public ContainerQueryKey(HddsProtos.LifeCycleState state, String owner,
HddsProtos.ReplicationFactor factor, HddsProtos.ReplicationType type) {
this.state = state;
this.owner = owner;
this.factor = factor;
this.type = type;
}
/**
* Returns the state of containers which this key represents.
* @return LifeCycleState
*/
public HddsProtos.LifeCycleState getState() {
return state;
}
/**
* Returns the owner of containers which this key represents.
* @return Owner
*/
public String getOwner() {
return owner;
}
/**
* Returns the replication factor of containers which this key represents.
* @return ReplicationFactor
*/
public HddsProtos.ReplicationFactor getFactor() {
return factor;
}
/**
* Returns the replication type of containers which this key represents.
* @return ReplicationType
*/
public HddsProtos.ReplicationType getType() {
return type;
}
}

View File

@ -41,6 +41,7 @@
import java.util.TreeSet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.CONTAINER_EXISTS;
@ -105,6 +106,7 @@ public class ContainerStateMap {
private final Map<ContainerID, Set<DatanodeDetails>> contReplicaMap;
private final static NavigableSet<ContainerID> EMPTY_SET =
Collections.unmodifiableNavigableSet(new TreeSet<>());
private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache;
// Container State Map lock should be held before calling into
// Update ContainerAttributes. The consistency of ContainerAttributes is
@ -127,6 +129,7 @@ public ContainerStateMap() {
// new ReentrantLock(),
// 1000,
// 300));
resultCache = new ConcurrentHashMap<>();
}
/**
@ -158,6 +161,10 @@ public void addContainer(ContainerInfo info)
if (info.isContainerOpen()) {
openPipelineMap.insert(info.getPipelineID(), id);
}
// Flush the cache of this container type, will be added later when
// get container queries are executed.
flushCache(info);
LOG.trace("Created container with {} successfully.", id);
} finally {
lock.writeLock().unlock();
@ -181,10 +188,19 @@ public ContainerInfo getContainerInfo(ContainerInfo info) {
* @return container info, if found.
*/
public ContainerInfo getContainerInfo(long containerID) {
return getContainerInfo(ContainerID.valueof(containerID));
}
/**
* Returns the latest state of Container from SCM's Container State Map.
*
* @param containerID - ContainerID
* @return container info, if found.
*/
public ContainerInfo getContainerInfo(ContainerID containerID) {
lock.readLock().lock();
try {
ContainerID id = new ContainerID(containerID);
return containerMap.get(id);
return containerMap.get(containerID);
} finally {
lock.readLock().unlock();
}
@ -304,6 +320,7 @@ public void updateContainerInfo(ContainerInfo info) throws SCMException {
if (currentInfo == null) {
throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
}
flushCache(info, currentInfo);
containerMap.put(info.containerID(), info);
} finally {
lock.writeLock().unlock();
@ -329,6 +346,11 @@ public void updateState(ContainerInfo info, LifeCycleState currentState,
lock.writeLock().lock();
try {
try {
// Just flush both old and new data sets from the result cache.
ContainerInfo newInfo = new ContainerInfo(info);
newInfo.setState(newState);
flushCache(newInfo, info);
currentInfo = containerMap.get(id);
if (currentInfo == null) {
@ -481,6 +503,11 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
lock.readLock().lock();
try {
ContainerQueryKey queryKey =
new ContainerQueryKey(state, owner, factor, type);
if(resultCache.containsKey(queryKey)){
return resultCache.get(queryKey);
}
// If we cannot meet any one condition we return EMPTY_SET immediately.
// Since when we intersect these sets, the result will be empty if any
@ -517,6 +544,7 @@ public NavigableSet<ContainerID> getMatchingContainerIDs(
for (int x = 1; x < sets.length; x++) {
currentSet = intersectSets(currentSet, sets[x]);
}
resultCache.put(queryKey, currentSet);
return currentSet;
} finally {
lock.readLock().unlock();
@ -566,4 +594,14 @@ private NavigableSet<ContainerID>[] sortBySize(
}
return sets;
}
private void flushCache(ContainerInfo... containerInfos) {
for (ContainerInfo containerInfo : containerInfos) {
ContainerQueryKey key = new ContainerQueryKey(containerInfo.getState(),
containerInfo.getOwner(), containerInfo.getReplicationFactor(),
containerInfo.getReplicationType());
resultCache.remove(key);
}
}
}

View File

@ -202,8 +202,7 @@ public void testProcessReportDetectNewContainers() throws SCMException {
map.insertNewDatanode(key, values);
final int newCount = 100;
// This is not a mistake, the treeset seems to be reverse sorted.
ContainerID last = values.first();
ContainerID last = values.last();
TreeSet<ContainerID> addedContainers = new TreeSet<>();
for (int x = 1; x <= newCount; x++) {
long cTemp = last.getId() + x;
@ -244,7 +243,7 @@ public void testProcessReportDetectMissingContainers() throws SCMException {
final int removeCount = 100;
Random r = new Random();
ContainerID first = values.last();
ContainerID first = values.first();
TreeSet<ContainerID> removedContainers = new TreeSet<>();
// Pick a random container to remove it is ok to collide no issues.
@ -290,7 +289,7 @@ public void testProcessReportDetectNewAndMissingContainers() throws
final int removeCount = 100;
Random r = new Random();
ContainerID first = values.last();
ContainerID first = values.first();
TreeSet<ContainerID> removedContainers = new TreeSet<>();
// Pick a random container to remove it is ok to collide no issues.

View File

@ -46,14 +46,20 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
/**
* Benchmarks ContainerStateMap class.
*/
@State(Scope.Thread)
public class BenchMarkContainerStateMap {
private ContainerStateMap stateMap;
private AtomicInteger containerID;
private AtomicInteger runCount;
private static int errorFrequency = 100;
@Setup(Level.Trial)
public void initialize() throws IOException {
stateMap = new ContainerStateMap();
runCount = new AtomicInteger(0);
Pipeline pipeline = createSingleNodePipeline(UUID.randomUUID().toString());
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null.");
int currentCount = 1;
@ -80,7 +86,7 @@ public void initialize() throws IOException {
e.printStackTrace();
}
}
for (int y = currentCount; y < 2000; y++) {
for (int y = currentCount; y < 50000; y++) {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(OPEN)
@ -169,9 +175,15 @@ public static Pipeline createPipeline(String containerName,
@Benchmark
public void createContainerBenchMark(BenchMarkContainerStateMap state,
Blackhole bh) throws IOException {
ContainerInfo containerInfo = getContainerInfo(state);
state.stateMap.addContainer(containerInfo);
}
private ContainerInfo getContainerInfo(BenchMarkContainerStateMap state)
throws IOException {
Pipeline pipeline = createSingleNodePipeline(UUID.randomUUID().toString());
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
return new ContainerInfo.Builder()
.setState(CLOSED)
.setPipelineID(pipeline.getId())
.setReplicationType(pipeline.getType())
@ -186,14 +198,16 @@ public void createContainerBenchMark(BenchMarkContainerStateMap state,
.setContainerID(cid)
.setDeleteTransactionId(0)
.build();
state.stateMap.addContainer(containerInfo);
}
@Benchmark
public void getMatchingContainerBenchMark(BenchMarkContainerStateMap state,
Blackhole bh) {
Blackhole bh) throws IOException {
if(runCount.incrementAndGet() % errorFrequency == 0) {
state.stateMap.addContainer(getContainerInfo(state));
}
bh.consume(state.stateMap
.getMatchingContainerIDs(OPEN, "BILBO", ReplicationFactor.ONE,
.getMatchingContainerIDs(OPEN, "OZONE", ReplicationFactor.ONE,
ReplicationType.STAND_ALONE));
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.ozone.genesis;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine
@ -51,7 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -64,15 +61,15 @@
.GetKeyRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
/**
* Benchmarks DatanodeDispatcher class.
*/
@State(Scope.Benchmark)
public class BenchMarkDatanodeDispatcher {
private String baseDir;
private String datanodeUuid;
private Pipeline pipeline;
private HddsDispatcher dispatcher;
private ByteString data;
private Random random;
@ -80,20 +77,17 @@ public class BenchMarkDatanodeDispatcher {
private AtomicInteger keyCount;
private AtomicInteger chunkCount;
final int initContainers = 100;
final int initKeys = 50;
final int initChunks = 100;
private static final int INIT_CONTAINERS = 100;
private static final int INIT_KEYS = 50;
private static final int INIT_CHUNKS = 100;
List<Long> containers;
List<Long> keys;
List<String> chunks;
private List<Long> containers;
private List<Long> keys;
private List<String> chunks;
@Setup(Level.Trial)
public void initialize() throws IOException {
datanodeUuid = UUID.randomUUID().toString();
pipeline = new Pipeline("127.0.0.1",
LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, PipelineID.randomId());
// 1 MB of data
data = ByteString.copyFromUtf8(RandomStringUtils.randomAscii(1048576));
@ -121,7 +115,7 @@ public void initialize() throws IOException {
chunks = new ArrayList<>();
// Create containers
for (int x = 0; x < initContainers; x++) {
for (int x = 0; x < INIT_CONTAINERS; x++) {
long containerID = Time.getUtcTime() + x;
ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
dispatcher.dispatch(req);
@ -129,21 +123,21 @@ public void initialize() throws IOException {
containerCount.getAndIncrement();
}
for (int x = 0; x < initKeys; x++) {
for (int x = 0; x < INIT_KEYS; x++) {
keys.add(Time.getUtcTime()+x);
}
for (int x = 0; x < initChunks; x++) {
for (int x = 0; x < INIT_CHUNKS; x++) {
chunks.add("chunk-" + x);
}
// Add chunk and keys to the containers
for (int x = 0; x < initKeys; x++) {
for (int x = 0; x < INIT_KEYS; x++) {
String chunkName = chunks.get(x);
chunkCount.getAndIncrement();
long key = keys.get(x);
keyCount.getAndIncrement();
for (int y = 0; y < initContainers; y++) {
for (int y = 0; y < INIT_CONTAINERS; y++) {
long containerID = containers.get(y);
BlockID blockID = new BlockID(containerID, key);
dispatcher
@ -294,7 +288,7 @@ public void getKey(BenchMarkDatanodeDispatcher bmdd) {
}
// Chunks writes from benchmark only reaches certain containers
// Use initChunks instead of updated counters to guarantee
// Use INIT_CHUNKS instead of updated counters to guarantee
// key/chunks are readable.
private BlockID getRandomBlockID() {
@ -302,15 +296,15 @@ private BlockID getRandomBlockID() {
}
private long getRandomContainerID() {
return containers.get(random.nextInt(initContainers));
return containers.get(random.nextInt(INIT_CONTAINERS));
}
private long getRandomKeyID() {
return keys.get(random.nextInt(initKeys));
return keys.get(random.nextInt(INIT_KEYS));
}
private String getRandomChunkToRead() {
return chunks.get(random.nextInt(initChunks));
return chunks.get(random.nextInt(INIT_CHUNKS));
}
private String getNewChunkToWrite() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.genesis;
import org.openjdk.jmh.profile.StackProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
@ -39,15 +40,15 @@ private Genesis() {
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(BenchMarkContainerStateMap.class.getSimpleName())
.include(BenchMarkMetadataStoreReads.class.getSimpleName())
.include(BenchMarkMetadataStoreWrites.class.getSimpleName())
.include(BenchMarkDatanodeDispatcher.class.getSimpleName())
// .include(BenchMarkMetadataStoreReads.class.getSimpleName())
// .include(BenchMarkMetadataStoreWrites.class.getSimpleName())
// .include(BenchMarkDatanodeDispatcher.class.getSimpleName())
// Commenting this test out, till we support either a command line or a config
// file based ability to run tests.
// .include(BenchMarkRocksDbStore.class.getSimpleName())
.warmupIterations(5)
.measurementIterations(20)
.addProfiler(GenesisMemoryProfiler.class)
.addProfiler(StackProfiler.class)
.shouldDoGC(true)
.forks(1)
.build();