HDFS-12522. Ozone: Remove the Priority Queues used in the Container State Manager. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2018-01-30 10:57:10 -08:00 committed by Owen O'Malley
parent 32245c78e2
commit 443425a5d9
21 changed files with 1742 additions and 366 deletions

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import com.google.common.base.Preconditions;
import org.apache.commons.math3.util.MathUtils;
/**
* Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
* <p>
* We are creating a specific type for this to avoid mixing this with
* normal integers in code.
*/
public class ContainerID implements Comparable {
private final long id;
/**
* Constructs ContainerID.
*
* @param id int
*/
public ContainerID(long id) {
Preconditions.checkState(id > 0,
"Container ID should be a positive int");
this.id = id;
}
/**
* Factory method for creation of ContainerID.
* @param containerID long
* @return ContainerID.
*/
public static ContainerID valueof(long containerID) {
Preconditions.checkState(containerID > 0);
return new ContainerID(containerID);
}
/**
* Returns int representation of ID.
*
* @return int
*/
public long getId() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerID that = (ContainerID) o;
return id == that.id;
}
@Override
public int hashCode() {
return MathUtils.hash(id);
}
@Override
public int compareTo(Object o) {
Preconditions.checkNotNull(o);
if (o instanceof ContainerID) {
return Long.compare(((ContainerID) o).getId(), this.getId());
}
throw new IllegalArgumentException("Object O, should be an instance " +
"of ContainerID");
}
@Override
public String toString() {
return "id=" + id;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
/**
* Container States.
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;

View File

@ -18,14 +18,18 @@
package org.apache.hadoop.scm.container.common.helpers;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
import org.apache.hadoop.util.Time;
import java.util.Comparator;
/** Class wraps ozone container info. */
/**
* Class wraps ozone container info.
*/
public class ContainerInfo
implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
private OzoneProtos.LifeCycleState state;
@ -40,8 +44,9 @@ public class ContainerInfo
private long stateEnterTime;
private String owner;
private String containerName;
private long containerID;
ContainerInfo(
long containerID,
final String containerName,
OzoneProtos.LifeCycleState state,
Pipeline pipeline,
@ -50,6 +55,7 @@ public class ContainerInfo
long numberOfKeys,
long stateEnterTime,
String owner) {
this.containerID = containerID;
this.containerName = containerName;
this.pipeline = pipeline;
this.allocatedBytes = allocatedBytes;
@ -77,9 +83,14 @@ public static ContainerInfo fromProtobuf(OzoneProtos.SCMContainerInfo info) {
builder.setStateEnterTime(info.getStateEnterTime());
builder.setOwner(info.getOwner());
builder.setContainerName(info.getContainerName());
builder.setContainerID(info.getContainerID());
return builder.build();
}
public long getContainerID() {
return containerID;
}
public String getContainerName() {
return containerName;
}
@ -88,6 +99,10 @@ public OzoneProtos.LifeCycleState getState() {
return state;
}
public void setState(OzoneProtos.LifeCycleState state) {
this.state = state;
}
public long getStateEnterTime() {
return stateEnterTime;
}
@ -100,6 +115,16 @@ public long getAllocatedBytes() {
return allocatedBytes;
}
/**
* Set Allocated bytes.
*
* @param size - newly allocated bytes -- negative size is case of deletes
* can be used.
*/
public void updateAllocatedBytes(long size) {
this.allocatedBytes += size;
}
public long getUsedBytes() {
return usedBytes;
}
@ -108,8 +133,13 @@ public long getNumberOfKeys() {
return numberOfKeys;
}
public ContainerID containerID() {
return new ContainerID(getContainerID());
}
/**
* Gets the last used time from SCM's perspective.
*
* @return time in milliseconds.
*/
public long getLastUsed() {
@ -135,6 +165,7 @@ public OzoneProtos.SCMContainerInfo getProtobuf() {
builder.setNumberOfKeys(getNumberOfKeys());
builder.setState(state);
builder.setStateEnterTime(stateEnterTime);
builder.setContainerID(getContainerID());
if (getOwner() != null) {
builder.setOwner(getOwner());
@ -180,7 +211,7 @@ public boolean equals(Object o) {
// TODO : Fix this later. If we add these factors some tests fail.
// So Commenting this to continue and will enforce this with
// Changes in pipeline where we remove Container Name to
// SCMContainerinfo from Pipline.
// SCMContainerinfo from Pipeline.
// .append(pipeline.getFactor(), that.pipeline.getFactor())
// .append(pipeline.getType(), that.pipeline.getType())
.append(owner, that.owner)
@ -233,7 +264,9 @@ public int compareTo(ContainerInfo o) {
return this.compare(this, o);
}
/** Builder class for ContainerInfo. */
/**
* Builder class for ContainerInfo.
*/
public static class Builder {
private OzoneProtos.LifeCycleState state;
private Pipeline pipeline;
@ -243,6 +276,13 @@ public static class Builder {
private long stateEnterTime;
private String owner;
private String containerName;
private long containerID;
public Builder setContainerID(long id) {
Preconditions.checkState(id >= 0);
this.containerID = id;
return this;
}
public Builder setState(OzoneProtos.LifeCycleState lifeCycleState) {
this.state = lifeCycleState;
@ -286,8 +326,8 @@ public Builder setContainerName(String container) {
public ContainerInfo build() {
return new
ContainerInfo(containerName, state, pipeline, allocated, used,
keys, stateEnterTime, owner);
ContainerInfo(containerID, containerName, state, pipeline,
allocated, used, keys, stateEnterTime, owner);
}
}
}

View File

@ -133,6 +133,7 @@ message SCMContainerInfo {
required uint64 numberOfKeys = 6;
optional int64 stateEnterTime = 7;
required string owner = 8;
required int64 containerID = 9;
}
message GetScmInfoRequestProto {

View File

@ -35,6 +35,17 @@ public class ContainerReport {
private long writeCount;
private long readBytes;
private long writeBytes;
private long containerID;
public long getContainerID() {
return containerID;
}
public void setContainerID(long containerID) {
this.containerID = containerID;
}
/**
@ -87,6 +98,7 @@ public static ContainerReport getFromProtoBuf(ContainerInfo info) {
report.setWriteBytes(info.getWriteBytes());
}
report.setContainerID(info.getContainerID());
return report;
}
@ -200,6 +212,7 @@ public ContainerInfo getProtoBufMessage() {
.setWriteCount(this.getWriteCount())
.setWriteBytes(this.getWriteBytes())
.setFinalhash(this.getFinalhash())
.setContainerID(this.getContainerID())
.build();
}
}

View File

@ -40,7 +40,7 @@
.ListContainerHandler.CONTAINER_LIST;
/**
* The handler class of container-specific commands, e.g. createContainer.
* The handler class of container-specific commands, e.g. addContainer.
*/
public class ContainerCommandHandler extends OzoneCommandHandler {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@ -29,8 +29,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@ -55,7 +54,8 @@
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This
@ -82,19 +82,18 @@ public class ContainerMapping implements Mapping {
*
* @param nodeManager - NodeManager so that we can get the nodes that are
* healthy to place new
* containers.
* containers.
* @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
* its nodes. This is
* passed to LevelDB and this memory is allocated in Native code space.
* CacheSize is specified
* in MB.
* @throws IOException
* passed to LevelDB and this memory is allocated in Native code space.
* CacheSize is specified
* in MB.
* @throws IOException on Failure.
*/
@SuppressWarnings("unchecked")
public ContainerMapping(
final Configuration conf, final NodeManager nodeManager, final int
cacheSizeMB)
throws IOException {
cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
@ -113,7 +112,7 @@ public ContainerMapping(
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
this.containerStateManager =
new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
new ContainerStateManager(conf, this);
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@ -128,7 +127,9 @@ public ContainerMapping(
containerLeaseManager.start();
}
/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public ContainerInfo getContainer(final String containerName) throws
IOException {
@ -142,16 +143,19 @@ public ContainerInfo getContainer(final String containerName) throws
"Specified key does not exist. key : " + containerName,
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
containerInfo =
ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes));
OzoneProtos.SCMContainerInfo temp = OzoneProtos.SCMContainerInfo.PARSER
.parseFrom(containerBytes);
containerInfo = ContainerInfo.fromProtobuf(temp);
return containerInfo;
} finally {
lock.unlock();
}
}
/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count) throws IOException {
@ -188,7 +192,7 @@ public List<ContainerInfo> listContainer(String startName,
*
* @param replicationFactor - replication factor of the container.
* @param containerName - Name of the container.
* @param owner
* @param owner - The string name of the Service that owns this container.
* @return - Pipeline that makes up this container.
* @throws IOException - Exception
*/
@ -201,7 +205,8 @@ public ContainerInfo allocateContainer(
throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
ContainerInfo containerInfo = null;
ContainerInfo containerInfo;
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
@ -219,7 +224,8 @@ public ContainerInfo allocateContainer(
}
containerInfo =
containerStateManager.allocateContainer(
pipelineSelector, type, replicationFactor, containerName, owner);
pipelineSelector, type, replicationFactor, containerName,
owner);
containerStore.put(
containerName.getBytes(encoding), containerInfo.getProtobuf()
.toByteArray());
@ -234,8 +240,8 @@ public ContainerInfo allocateContainer(
*
* @param containerName - Container name
* @throws IOException if container doesn't exist or container store failed
* to delete the
* specified key.
* to delete the
* specified key.
*/
@Override
public void deleteContainer(String containerName) throws IOException {
@ -255,7 +261,9 @@ public void deleteContainer(String containerName) throws IOException {
}
}
/** {@inheritDoc} Used by client to update container state on SCM. */
/**
* {@inheritDoc} Used by client to update container state on SCM.
*/
@Override
public OzoneProtos.LifeCycleState updateContainerState(
String containerName, OzoneProtos.LifeCycleEvent event) throws
@ -327,8 +335,10 @@ public OzoneProtos.LifeCycleState updateContainerState(
}
}
/** + * Returns the container State Manager. + * + * @return
* ContainerStateManager + */
/**
* Returns the container State Manager.
* @return ContainerStateManager
*/
@Override
public ContainerStateManager getStateManager() {
return containerStateManager;
@ -374,6 +384,7 @@ public void processContainerReports(
builder.setNumberOfKeys(containerInfo.getKeyCount());
builder.setState(oldInfo.getState());
builder.setStateEnterTime(oldInfo.getStateEnterTime());
builder.setContainerID(oldInfo.getContainerID());
if (oldInfo.getOwner() != null) {
builder.setOwner(oldInfo.getOwner());
}
@ -393,15 +404,16 @@ public void processContainerReports(
OzoneProtos.LifeCycleEvent.FINALIZE);
if (state != OzoneProtos.LifeCycleState.CLOSING) {
LOG.error("Failed to close container {}, reason : Not able to " +
"update container state, current container state: {}." +
"in state {}", containerInfo.getContainerName(), state);
"update container state, current container state: {}.",
containerInfo.getContainerName(), state);
}
}
} else {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
"container database.");
"container database.", datanodeID,
containerInfo.getContainerName());
}
} finally {
lock.unlock();
@ -409,14 +421,11 @@ public void processContainerReports(
}
}
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is
* already closed then invoking this method has no effect.
*
* <p>
*
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may
* fail require careful
* attention. It is strongly advised to relinquish the underlying resources
@ -445,7 +454,7 @@ public void close() throws IOException {
* containerStateManager, when closing ContainerMapping, we need to update
* this in the container store.
*
* @throws IOException
* @throws IOException on failure.
*/
@VisibleForTesting
public void flushContainerInfo() throws IOException {
@ -476,7 +485,7 @@ public void flushContainerInfo() throws IOException {
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {
LOG.debug("Container state manager has container {} but not found " +
"in container store, a deleted container?",
"in container store, a deleted container?",
info.getContainerName());
}
} catch (IOException ioe) {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@ -17,16 +17,19 @@
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerState;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerStateMap;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.ScmConfigKeys;
@ -39,26 +42,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.PriorityQueue;
import java.util.List;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import static org.apache.hadoop.ozone.scm.exceptions
.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* A container state manager keeps track of container states and returns
@ -86,7 +78,7 @@
* this container.
* <p>
* 4. Once the creation of the container is complete, the client will make
* another call to the SCM, this time specifing the containerName and the
* another call to the SCM, this time specifying the containerName and the
* COMPLETE_CREATE as the Event.
* <p>
* 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
@ -125,14 +117,9 @@ public class ContainerStateManager implements Closeable {
OzoneProtos.LifeCycleEvent> stateMachine;
private final long containerSize;
private final long cacheSize;
private final long blockSize;
// A map that maintains the ContainerKey to Containers of that type ordered
// by last access time.
private final ReadWriteLock lock;
private final Queue<ContainerInfo> containerCloseQueue;
private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers;
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
private final ContainerStateMap containers;
private final AtomicLong containerCount;
/**
* Constructs a Container State Manager that tracks all containers owned by
@ -140,9 +127,9 @@ public class ContainerStateManager implements Closeable {
* <p>
* TODO : Add Container Tags so we know which containers are owned by SCM.
*/
@SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
Mapping containerMapping, final long cacheSize) throws IOException {
this.cacheSize = cacheSize;
Mapping containerMapping) {
// Initialize the container state machine.
Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
@ -160,68 +147,46 @@ public ContainerStateManager(Configuration configuration,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
this.blockSize = OzoneConsts.MB * configuration.getLong(
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
lock = new ReentrantReadWriteLock();
containers = new HashMap<>();
lastUsedMap = new ConcurrentHashMap<>();
containerCount = new AtomicLong(0);
containers = new ContainerStateMap();
loadExistingContainers(containerMapping);
containerCloseQueue = new ConcurrentLinkedQueue<>();
}
/**
* Creates containers maps of following types.
* <p>
* OZONE of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
* CREATING, OPEN, CLOSED, DELETING, DELETED} container states
* <p>
* CBLOCK of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
* CREATING, OPEN, CLOSED, DELETING, DELETED} container states
* <p>
* Commented out for now: HDFS of type {Ratis, StandAlone, Chained} for each
* of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED} container
* states
*/
private void initializeContainerMaps(String owner) {
// Called only from Ctor path, hence no lock is held.
Preconditions.checkNotNull(containers);
for (ReplicationType type : ReplicationType.values()) {
for (ReplicationFactor factor : ReplicationFactor.values()) {
for (LifeCycleState state : LifeCycleState.values()) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
containers.put(key, queue);
}
}
}
}
/**
* Load containers from the container store into the containerMaps.
*
* @param containerMapping -- Mapping object containing container store.
*/
private void loadExistingContainers(Mapping containerMapping) {
List<ContainerInfo> containerList;
try {
List<String> ownerList = new ArrayList<>();
List<ContainerInfo> containerList =
containerMapping.listContainer(null, null, Integer.MAX_VALUE);
for (ContainerInfo container : containerList) {
String owner = container.getOwner();
if (ownerList.isEmpty() || !ownerList.contains(owner)) {
ownerList.add(owner);
initializeContainerMaps(owner);
}
ContainerKey key =
new ContainerKey(owner, container.getPipeline().getType(),
container.getPipeline().getFactor(), container.getState());
containers.get(key).add(container);
containerList = containerMapping.listContainer(null,
null, Integer.MAX_VALUE);
// if there are no container to load, let us return.
if (containerList == null || containerList.size() == 0) {
LOG.info("No containers to load for this cluster.");
return;
}
} catch (IOException e) {
if (!e.getMessage().equals("No container exists in current db")) {
LOG.info("Could not list the containers", e);
LOG.error("Could not list the containers", e);
}
return;
}
try {
long maxID = 0;
for (ContainerInfo container : containerList) {
containers.addContainer(container);
if (maxID < container.getContainerID()) {
maxID = container.getContainerID();
}
containerCount.set(maxID);
}
} catch (SCMException ex) {
LOG.error("Unable to create a container information. ", ex);
// Fix me, what is the proper shutdown procedure for SCM ??
// System.exit(1) // Should we exit here?
}
}
@ -230,9 +195,11 @@ private void loadExistingContainers(Mapping containerMapping) {
*
* @return the list of all container info.
*/
List<ContainerInfo> getAllContainers() {
public List<ContainerInfo> getAllContainers() {
List<ContainerInfo> list = new ArrayList<>();
containers.forEach((key, value) -> list.addAll(value));
//No Locking needed since the return value is an immutable map.
containers.getContainerMap().forEach((key, value) -> list.add(value));
return list;
}
@ -315,7 +282,7 @@ private void initializeStateMachine() {
* @param replicationFactor - Replication replicationFactor.
* @param containerName - Container Name.
* @return Container Info.
* @throws IOException
* @throws IOException on Failure.
*/
public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
@ -335,22 +302,11 @@ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
.setContainerID(containerCount.incrementAndGet())
.build();
Preconditions.checkNotNull(containerInfo);
lock.writeLock().lock();
try {
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
containerInfo.getState());
PriorityQueue<ContainerInfo> queue = containers.get(key);
if (queue == null) {
initializeContainerMaps(owner);
queue = containers.get(key);
}
queue.add(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
} finally {
lock.writeLock().unlock();
}
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
return containerInfo;
}
@ -360,7 +316,7 @@ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
* @param info - ContainerInfo
* @param event - LifeCycle Event
* @return Updated ContainerInfo.
* @throws SCMException
* @throws SCMException on Failure.
*/
public ContainerInfo updateContainerState(ContainerInfo
info, OzoneProtos.LifeCycleEvent event) throws SCMException {
@ -369,7 +325,8 @@ public ContainerInfo updateContainerState(ContainerInfo
newState = this.stateMachine.getNextState(info.getState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
"reason: invalid state transition from state: %s upon event: %s.",
"reason: invalid state transition from state: %s upon " +
"event: %s.",
info.getPipeline().getContainerName(), info.getState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
@ -377,191 +334,119 @@ public ContainerInfo updateContainerState(ContainerInfo
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
Pipeline pipeline = info.getPipeline();
ContainerKey oldKey = new ContainerKey(info.getOwner(), pipeline.getType(),
pipeline.getFactor(), info.getState());
ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
pipeline.getFactor(), newState);
lock.writeLock().lock();
try {
PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey);
// This should never happen, since we have initialized the map and
// queues to all possible states. No harm in asserting that info.
Preconditions.checkNotNull(currentQueue);
// TODO : Should we read this container info from the database if this
// is missing in the queue?. Right now we just add it into the queue.
// We also need a background thread that will remove unused containers
// from memory after 24 hours. This is really a low priority work item
// since typical clusters will have less than 10's of millions of open
// containers at a given time, which we can easily keep in memory.
if (currentQueue.contains(info)) {
currentQueue.remove(info);
}
PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey);
Preconditions.checkNotNull(nextQueue);
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(info.getContainerName())
.setState(newState)
.setPipeline(info.getPipeline())
.setAllocatedBytes(info.getAllocatedBytes())
.setUsedBytes(info.getUsedBytes())
.setNumberOfKeys(info.getNumberOfKeys())
.setStateEnterTime(Time.monotonicNow())
.setOwner(info.getOwner())
.build();
Preconditions.checkNotNull(containerInfo);
nextQueue.add(containerInfo);
return containerInfo;
} finally {
lock.writeLock().unlock();
}
containers.updateState(info, info.getState(), newState);
return containers.getContainerInfo(info);
}
/**
* Update the container State.
* @param info - Container Info
* @return ContainerInfo
* @throws SCMException - on Error.
*/
public ContainerInfo updateContainerInfo(ContainerInfo info)
throws SCMException {
containers.updateContainerInfo(info);
return containers.getContainerInfo(info);
}
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param owner - Owner of the container {OZONE, CBLOCK}
* @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return ContainerInfo
* @return ContainerInfo, null if there is no match found.
*/
public ContainerInfo getMatchingContainer(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.writeLock().lock();
try {
PriorityQueue<ContainerInfo> queue = containers.get(key);
if (queue == null) {
initializeContainerMaps(owner);
queue = containers.get(key);
}
if (queue.size() == 0) {
// We don't have any Containers of this type.
return null;
}
Iterator<ContainerInfo> iter = queue.iterator();
// Two assumptions here.
// 1. The Iteration on the heap is in ordered by the last used time.
// 2. We remove and add the node back to push the node to the end of
// the queue.
while (iter.hasNext()) {
ContainerInfo info = iter.next();
if (info.getAllocatedBytes() + size <= this.containerSize) {
queue.remove(info);
info.allocate(size);
info.updateLastUsedTime();
queue.add(info);
// Find containers that match the query spec, if no match return null.
NavigableSet<ContainerID> matchingSet =
containers.getMatchingContainerIDs(state, owner, factor, type);
if (matchingSet == null || matchingSet.size() == 0) {
return null;
}
return info;
}
// Get the last used container and find container above the last used
// container ID.
ContainerState key = new ContainerState(owner, type, factor);
ContainerID lastID = lastUsedMap.get(key);
if(lastID == null) {
lastID = matchingSet.first();
}
// There is a small issue here. The first time, we will skip the first
// container. But in most cases it will not matter.
NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
if (resultSet.size() == 0) {
resultSet = matchingSet;
}
ContainerInfo selectedContainer =
findContainerWithSpace(size, resultSet, owner);
if (selectedContainer == null) {
// If we did not find any space in the tailSet, we need to look for
// space in the headset, we need to pass true to deal with the
// situation that we have a lone container that has space. That is we
// ignored the last used container under the assumption we can find
// other containers with space, but if have a single container that is
// not true. Hence we need to include the last used container as the
// last element in the sorted set.
resultSet = matchingSet.headSet(lastID, true);
selectedContainer = findContainerWithSpace(size, resultSet, owner);
}
// Update the allocated Bytes on this container.
if(selectedContainer != null) {
selectedContainer.updateAllocatedBytes(size);
}
return selectedContainer;
}
private ContainerInfo findContainerWithSpace(long size,
NavigableSet<ContainerID> searchSet, String owner) {
// Get the container with space to meet our request.
for (ContainerID id : searchSet) {
ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
if ((containerInfo.getAllocatedBytes() <= this.containerSize) &&
(containerInfo.getAllocatedBytes() <= size)) {
containerInfo.updateLastUsedTime();
ContainerState key = new ContainerState(owner,
containerInfo.getPipeline().getType(),
containerInfo.getPipeline().getFactor());
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
} finally {
lock.writeLock().unlock();
}
return null;
}
@VisibleForTesting
public List<ContainerInfo> getMatchingContainers(String owner,
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.readLock().lock();
try {
if (containers.get(key) == null) {
return null;
} else {
return Arrays.asList((ContainerInfo[]) containers.get(key)
.toArray(new ContainerInfo[0]));
}
} catch (Exception e) {
LOG.error("Could not get matching containers", e);
} finally {
lock.readLock().unlock();
}
return null;
/**
* Returns a set of ContainerIDs that match the Container.
*
* @param owner Owner of the Containers.
* @param type - Replication Type of the containers
* @param factor - Replication factor of the containers.
* @param state - Current State, like Open, Close etc.
* @return Set of containers that match the specific query parameters.
*/
public NavigableSet<ContainerID> getMatchingContainerIDs(
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
return containers.getMatchingContainerIDs(state, owner,
factor, type);
}
@Override
public void close() throws IOException {
//TODO: update container metadata db with actual allocated bytes values.
}
/**
* Class that acts as the container Key.
*/
private static class ContainerKey {
private final LifeCycleState state;
private final ReplicationType type;
private final String owner;
private final ReplicationFactor replicationFactor;
/**
* Constructs a Container Key.
*
* @param owner - Container Owners
* @param type - Replication Type.
* @param factor - Replication Factors
* @param state - LifeCycle State
*/
ContainerKey(String owner, ReplicationType type,
ReplicationFactor factor, LifeCycleState state) {
this.state = state;
this.type = type;
this.owner = owner;
this.replicationFactor = factor;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerKey that = (ContainerKey) o;
return new EqualsBuilder()
.append(state, that.state)
.append(type, that.type)
.append(owner, that.owner)
.append(replicationFactor, that.replicationFactor)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(137, 757)
.append(state)
.append(type)
.append(owner)
.append(replicationFactor)
.toHashCode();
}
@Override
public String toString() {
return "ContainerKey{" +
"state=" + state +
", type=" + type +
", owner=" + owner +
", replicationFactor=" + replicationFactor +
'}';
}
}
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* Each Attribute that we manage for a container is maintained as a map.
* <p>
* Currently we manage the following attributes for a container.
* <p>
* 1. StateMap - LifeCycleState -> Set of ContainerIDs
* 2. TypeMap - ReplicationType -> Set of ContainerIDs
* 3. OwnerMap - OwnerNames -> Set of ContainerIDs
* 4. FactorMap - ReplicationFactor -> Set of ContainerIDs
* <p>
* This means that for a cluster size of 750 PB -- we will have around 150
* Million containers, if we assume 5GB average container size.
* <p>
* That implies that these maps will take around 2/3 GB of RAM which will be
* pinned down in the SCM. This is deemed acceptable since we can tune the
* container size --say we make it 10GB average size, then we can deal with a
* cluster size of 1.5 exa bytes with the same metadata in SCMs memory.
* <p>
* Please note: **This class is not thread safe**. This used to be thread safe,
* while bench marking we found that ContainerStateMap would be taking 5
* locks for a single container insert. If we remove locks in this class,
* then we are able to perform about 540K operations per second, with the
* locks in this class it goes down to 246K operations per second. Hence we
* are going to rely on ContainerStateMap locks to maintain consistency of
* data in these classes too, since ContainerAttribute is only used by
* ContainerStateMap class.
*/
public class ContainerAttribute<T> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerAttribute.class);
private final Map<T, NavigableSet<ContainerID>> attributeMap;
private static final NavigableSet<ContainerID> EMPTY_SET = Collections
.unmodifiableNavigableSet(new TreeSet<>());
/**
* Creates a Container Attribute map from an existing Map.
*
* @param attributeMap - AttributeMap
*/
public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
this.attributeMap = attributeMap;
}
/**
* Create an empty Container Attribute map.
*/
public ContainerAttribute() {
this.attributeMap = new HashMap<>();
}
/**
* Insert or update the value in the Attribute map.
*
* @param key - The key to the set where the ContainerID should exist.
* @param value - Actual Container ID.
* @throws SCMException - on Error
*/
public boolean insert(T key, ContainerID value) throws SCMException {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
if (attributeMap.containsKey(key)) {
if (attributeMap.get(key).add(value)) {
return true; //we inserted the value as it doesnt exist in the set.
} else { // Failure indicates that this ContainerID exists in the Set
if (!attributeMap.get(key).remove(value)) {
LOG.error("Failure to remove the object from the Map.Key:{}, " +
"ContainerID: {}", key, value);
throw new SCMException("Failure to remove the object from the Map",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
attributeMap.get(key).add(value);
return true;
}
} else {
// This key does not exist, we need to allocate this key in the map.
// TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils.
// Skipping for now, since FoldedTreeSet does not have implementations
// for headSet and TailSet. We need those calls.
this.attributeMap.put(key, new TreeSet<>());
// This should not fail, we just allocated this object.
attributeMap.get(key).add(value);
return true;
}
}
/**
* Returns true if have this bucket in the attribute map.
*
* @param key - Key to lookup
* @return true if we have the key
*/
public boolean hasKey(T key) {
Preconditions.checkNotNull(key);
return this.attributeMap.containsKey(key);
}
/**
* Returns true if we have the key and the containerID in the bucket.
*
* @param key - Key to the bucket
* @param id - container ID that we want to lookup
* @return true or false
*/
public boolean hasContainerID(T key, ContainerID id) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(id);
return this.attributeMap.containsKey(key) &&
this.attributeMap.get(key).contains(id);
}
/**
* Returns true if we have the key and the containerID in the bucket.
*
* @param key - Key to the bucket
* @param id - container ID that we want to lookup
* @return true or false
*/
public boolean hasContainerID(T key, int id) {
return hasContainerID(key, ContainerID.valueof(id));
}
/**
* Clears all entries for this key type.
*
* @param key - Key that identifies the Set.
*/
public void clearSet(T key) {
Preconditions.checkNotNull(key);
if (attributeMap.containsKey(key)) {
attributeMap.get(key).clear();
} else {
LOG.debug("key: {} does not exist in the attributeMap", key);
}
}
/**
* Removes a container ID from the set pointed by the key.
*
* @param key - key to identify the set.
* @param value - Container ID
*/
public boolean remove(T key, ContainerID value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
if (attributeMap.containsKey(key)) {
if (!attributeMap.get(key).remove(value)) {
LOG.debug("ContainerID: {} does not exist in the set pointed by " +
"key:{}", value, key);
return false;
}
return true;
} else {
LOG.debug("key: {} does not exist in the attributeMap", key);
return false;
}
}
/**
* Returns the collection that maps to the given key.
*
* @param key - Key to the bucket.
* @return Underlying Set in immutable form.
*/
public NavigableSet<ContainerID> getCollection(T key) {
Preconditions.checkNotNull(key);
if (this.attributeMap.containsKey(key)) {
return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
}
LOG.debug("No such Key. Key {}", key);
return EMPTY_SET;
}
/**
* Moves a ContainerID from one bucket to another.
*
* @param currentKey - Current Key
* @param newKey - newKey
* @param value - ContainerID
* @throws SCMException on Error
*/
public void update(T currentKey, T newKey, ContainerID value)
throws SCMException {
Preconditions.checkNotNull(currentKey);
Preconditions.checkNotNull(newKey);
boolean removed = false;
try {
removed = remove(currentKey, value);
if (!removed) {
throw new SCMException("Unable to find key in the current key bucket",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
insert(newKey, value);
} catch (SCMException ex) {
// if we removed the key, insert it back to original bucket, since the
// next insert failed.
LOG.error("error in update.", ex);
if (removed) {
insert(currentKey, value);
LOG.trace("reinserted the removed key. {}", currentKey);
}
throw ex;
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
/**
* Class that acts as the container state.
*/
public class ContainerState {
private final OzoneProtos.ReplicationType type;
private final String owner;
private final OzoneProtos.ReplicationFactor replicationFactor;
/**
* Constructs a Container Key.
*
* @param owner - Container Owners
* @param type - Replication Type.
* @param factor - Replication Factors
*/
public ContainerState(String owner, OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor) {
this.type = type;
this.owner = owner;
this.replicationFactor = factor;
}
public OzoneProtos.ReplicationType getType() {
return type;
}
public String getOwner() {
return owner;
}
public OzoneProtos.ReplicationFactor getFactor() {
return replicationFactor;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContainerState that = (ContainerState) o;
return new EqualsBuilder()
.append(type, that.type)
.append(owner, that.owner)
.append(replicationFactor, that.replicationFactor)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(137, 757)
.append(type)
.append(owner)
.append(replicationFactor)
.toHashCode();
}
@Override
public String toString() {
return "ContainerKey{" +
", type=" + type +
", owner=" + owner +
", replicationFactor=" + replicationFactor +
'}';
}
}

View File

@ -0,0 +1,402 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.util.AutoCloseableLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.CONTAINER_EXISTS;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_FIND_CONTAINER;
/**
* Container State Map acts like a unified map for various attributes that are
* used to select containers when we need allocated blocks.
* <p>
* This class provides the ability to query 4 classes of attributes. They are
* <p>
* 1. LifeCycleStates - LifeCycle States of container describe in which state
* a container is. For example, a container needs to be in Open State for a
* client to able to write to it.
* <p>
* 2. Owners - Each instance of Name service, for example, Namenode of HDFS or
* Key Space Manager (KSM) of Ozone or CBlockServer -- is an owner. It is
* possible to have many KSMs for a Ozone cluster and only one SCM. But SCM
* keeps the data from each KSM in separate bucket, never mixing them. To
* write data, often we have to find all open containers for a specific owner.
* <p>
* 3. ReplicationType - The clients are allowed to specify what kind of
* replication pipeline they want to use. Each Container exists on top of a
* pipeline, so we need to get ReplicationType that is specified by the user.
* <p>
* 4. ReplicationFactor - The replication factor represents how many copies
* of data should be made, right now we support 2 different types, ONE
* Replica and THREE Replica. User can specify how many copies should be made
* for a ozone key.
* <p>
* The most common access pattern of this class is to select a container based
* on all these parameters, for example, when allocating a block we will
* select a container that belongs to user1, with Ratis replication which can
* make 3 copies of data. The fact that we will look for open containers by
* default and if we cannot find them we will add new containers.
*/
public class ContainerStateMap {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStateMap.class);
private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationFactor> factorMap;
private final ContainerAttribute<ReplicationType> typeMap;
private final Map<ContainerID, ContainerInfo> containerMap;
private final static NavigableSet<ContainerID> EMPTY_SET =
Collections.unmodifiableNavigableSet(new TreeSet<>());
// Container State Map lock should be held before calling into
// Update ContainerAttributes. The consistency of ContainerAttributes is
// protected by this lock.
private final AutoCloseableLock autoLock;
/**
* Create a ContainerStateMap.
*/
public ContainerStateMap() {
lifeCycleStateMap = new ContainerAttribute<>();
ownerMap = new ContainerAttribute<>();
factorMap = new ContainerAttribute<>();
typeMap = new ContainerAttribute<>();
containerMap = new HashMap<>();
autoLock = new AutoCloseableLock();
// new InstrumentedLock(getClass().getName(), LOG,
// new ReentrantLock(),
// 1000,
// 300));
}
/**
* Adds a ContainerInfo Entry in the ContainerStateMap.
*
* @param info - container info
* @throws SCMException - throws if create failed.
*/
public void addContainer(ContainerInfo info)
throws SCMException {
try (AutoCloseableLock lock = autoLock.acquire()) {
ContainerID id = ContainerID.valueof(info.getContainerID());
if (containerMap.putIfAbsent(id, info) != null) {
LOG.debug("Duplicate container ID detected. {}", id);
throw new
SCMException("Duplicate container ID detected.",
CONTAINER_EXISTS);
}
lifeCycleStateMap.insert(info.getState(), id);
ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getPipeline().getFactor(), id);
typeMap.insert(info.getPipeline().getType(), id);
LOG.trace("Created container with {} successfully.", id);
}
}
/**
* Returns the latest state of Container from SCM's Container State Map.
*
* @param info - ContainerInfo
* @return ContainerInfo
*/
public ContainerInfo getContainerInfo(ContainerInfo info) {
return getContainerInfo(info.getContainerID());
}
/**
* Returns the latest state of Container from SCM's Container State Map.
*
* @param containerID - int
* @return container info, if found.
*/
public ContainerInfo getContainerInfo(long containerID) {
ContainerID id = new ContainerID(containerID);
return containerMap.get(id);
}
/**
* Returns the full container Map.
*
* @return - Map
*/
public Map<ContainerID, ContainerInfo> getContainerMap() {
try (AutoCloseableLock lock = autoLock.acquire()) {
return Collections.unmodifiableMap(containerMap);
}
}
/**
* Just update the container State.
* @param info ContainerInfo.
*/
public void updateContainerInfo(ContainerInfo info) throws SCMException {
Preconditions.checkNotNull(info);
ContainerInfo currentInfo = null;
try (AutoCloseableLock lock = autoLock.acquire()) {
currentInfo = containerMap.get(
ContainerID.valueof(info.getContainerID()));
if (currentInfo == null) {
throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
}
containerMap.put(info.containerID(), info);
}
}
/**
* Update the State of a container.
*
* @param info - ContainerInfo
* @param currentState - CurrentState
* @param newState - NewState.
* @throws SCMException - in case of failure.
*/
public void updateState(ContainerInfo info, LifeCycleState currentState,
LifeCycleState newState) throws SCMException {
Preconditions.checkNotNull(currentState);
Preconditions.checkNotNull(newState);
ContainerID id = new ContainerID(info.getContainerID());
ContainerInfo currentInfo = null;
try (AutoCloseableLock lock = autoLock.acquire()) {
currentInfo = containerMap.get(id);
if (currentInfo == null) {
throw new
SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
}
// We are updating two places before this update is done, these can
// fail independently, since the code needs to handle it.
// We update the attribute map, if that fails it will throw an exception,
// so no issues, if we are successful, we keep track of the fact that we
// have updated the lifecycle state in the map, and update the container
// state. If this second update fails, we will attempt to roll back the
// earlier change we did. If the rollback fails, we can be in an
// inconsistent state,
info.setState(newState);
containerMap.put(id, info);
lifeCycleStateMap.update(currentState, newState, id);
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", id, currentState, newState);
} catch (SCMException ex) {
LOG.error("Unable to update the container state. {}", ex);
// we need to revert the change in this attribute since we are not
// able to update the hash table.
LOG.info("Reverting the update to lifecycle state. Moving back to " +
"old state. Old = {}, Attempted state = {}", currentState,
newState);
containerMap.put(id, currentInfo);
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
// container state will not in sync since we were not able to put
// that into the hash table.
lifeCycleStateMap.update(newState, currentState, id);
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
}
}
/**
* Returns A list of containers owned by a name service.
*
* @param ownerName - Name of the NameService.
* @return - NavigableSet of ContainerIDs.
*/
NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
Preconditions.checkNotNull(ownerName);
try (AutoCloseableLock lock = autoLock.acquire()) {
return ownerMap.getCollection(ownerName);
}
}
/**
* Returns Containers in the System by the Type.
*
* @param type - Replication type -- StandAlone, Ratis etc.
* @return NavigableSet
*/
NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
Preconditions.checkNotNull(type);
try (AutoCloseableLock lock = autoLock.acquire()) {
return typeMap.getCollection(type);
}
}
/**
* Returns Containers by replication factor.
*
* @param factor - Replication Factor.
* @return NavigableSet.
*/
NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
Preconditions.checkNotNull(factor);
try (AutoCloseableLock lock = autoLock.acquire()) {
return factorMap.getCollection(factor);
}
}
/**
* Returns Containers by State.
*
* @param state - State - Open, Closed etc.
* @return List of containers by state.
*/
NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
Preconditions.checkNotNull(state);
try (AutoCloseableLock lock = autoLock.acquire()) {
return lifeCycleStateMap.getCollection(state);
}
}
/**
* Gets the containers that matches the following filters.
*
* @param state - LifeCycleState
* @param owner - Owner
* @param factor - Replication Factor
* @param type - Replication Type
* @return ContainerInfo or Null if not container satisfies the criteria.
*/
public NavigableSet<ContainerID> getMatchingContainerIDs(
LifeCycleState state, String owner,
ReplicationFactor factor, ReplicationType type) {
Preconditions.checkNotNull(state, "State cannot be null");
Preconditions.checkNotNull(owner, "Owner cannot be null");
Preconditions.checkNotNull(factor, "Factor cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
try (AutoCloseableLock lock = autoLock.acquire()) {
// If we cannot meet any one condition we return EMPTY_SET immediately.
// Since when we intersect these sets, the result will be empty if any
// one is empty.
NavigableSet<ContainerID> stateSet =
lifeCycleStateMap.getCollection(state);
if (stateSet.size() == 0) {
return EMPTY_SET;
}
NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
if (ownerSet.size() == 0) {
return EMPTY_SET;
}
NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
if (factorSet.size() == 0) {
return EMPTY_SET;
}
NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
if (typeSet.size() == 0) {
return EMPTY_SET;
}
// if we add more constraints we will just add those sets here..
NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
ownerSet, factorSet, typeSet);
NavigableSet<ContainerID> currentSet = sets[0];
// We take the smallest set and intersect against the larger sets. This
// allows us to reduce the lookups to the least possible number.
for (int x = 1; x < sets.length; x++) {
currentSet = intersectSets(currentSet, sets[x]);
}
return currentSet;
}
}
/**
* Calculates the intersection between sets and returns a new set.
*
* @param smaller - First Set
* @param bigger - Second Set
* @return resultSet which is the intersection of these two sets.
*/
private NavigableSet<ContainerID> intersectSets(
NavigableSet<ContainerID> smaller,
NavigableSet<ContainerID> bigger) {
Preconditions.checkState(smaller.size() <= bigger.size(),
"This function assumes the first set is lesser or equal to second " +
"set");
NavigableSet<ContainerID> resultSet = new TreeSet<>();
for (ContainerID id : smaller) {
if (bigger.contains(id)) {
resultSet.add(id);
}
}
return resultSet;
}
/**
* Sorts a list of Sets based on Size. This is useful when we are
* intersecting the sets.
*
* @param sets - varagrs of sets
* @return Returns a sorted array of sets based on the size of the set.
*/
@SuppressWarnings("unchecked")
private NavigableSet<ContainerID>[] sortBySize(
NavigableSet<ContainerID>... sets) {
for (int x = 0; x < sets.length - 1; x++) {
for (int y = 0; y < sets.length - x - 1; y++) {
if (sets[y].size() > sets[y + 1].size()) {
NavigableSet temp = sets[y];
sets[y] = sets[y + 1];
sets[y + 1] = temp;
}
}
}
return sets;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
/**
* Container States management package.
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;

View File

@ -103,6 +103,7 @@ message ContainerInfo {
optional int64 writeCount = 7;
optional int64 readBytes = 8;
optional int64 writeBytes = 9;
required int64 containerID = 10;
}
// The deleted blocks which are stored in deletedBlock.db of scm.

View File

@ -122,6 +122,8 @@ public static Pipeline createPipeline(
final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
pipeline.setContainerName(containerName);
pipeline.addMember(leader);
pipeline.setFactor(OzoneProtos.ReplicationFactor.ONE);
pipeline.setType(OzoneProtos.ReplicationType.STAND_ALONE);
for(; i.hasNext();) {
pipeline.addMember(i.next());
@ -346,7 +348,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
String containerName, Pipeline pipeline) throws IOException {
LOG.trace("createContainer: {}", containerName);
LOG.trace("addContainer: {}", containerName);
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto

View File

@ -72,14 +72,17 @@ public List<ContainerReportsRequestProto> getContainerReport(
"required container reports");
}
int containerID = 1;
while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
nodesInPool.remove(id);
containerID++;
// We return container reports only for nodes that are healthy.
if (nodeManager.getNodeState(id) == HEALTHY) {
ContainerInfo info = ContainerInfo.newBuilder()
.setContainerName(containerName)
.setFinalhash(DigestUtils.sha256Hex(containerName))
.setContainerID(containerID)
.build();
ContainerReportsRequestProto containerReport =
ContainerReportsRequestProto.newBuilder().addReports(info)

View File

@ -448,6 +448,8 @@ private ContainerReportsRequestProto createContainerReport(int count) {
report.setReadBytes(OzoneConsts.GB * 1);
report.setWriteCount(50);
report.setWriteBytes(OzoneConsts.GB * 2);
report.setContainerID(1);
reportsBuilder.addReports(report.getProtoBufMessage());
}
reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.LifeCycleState.OPEN;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.ReplicationFactor.ONE;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
.ReplicationType.STAND_ALONE;
public class BenchmarkContainerStateMap {
@Test
public void testRunBenchMarks() throws RunnerException {
Options opt = new OptionsBuilder()
.include(this.getClass().getName() + ".*")
.mode(Mode.Throughput)
.timeUnit(TimeUnit.SECONDS)
.warmupTime(TimeValue.seconds(1))
.warmupIterations(2)
.measurementTime(TimeValue.seconds(1))
.measurementIterations(2)
.threads(2)
.forks(1)
.shouldFailOnError(true)
.shouldDoGC(true)
.build();
new Runner(opt).run();
}
@Benchmark
public void createContainerBenchMark(BenchmarkState state, Blackhole bh)
throws IOException {
Pipeline pipeline = ContainerTestHelper
.createSingleNodePipeline(UUID.randomUUID().toString());
int cid = state.containerID.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.CLOSED)
.setPipeline(null)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(cid)
.build();
state.stateMap.addContainer(containerInfo);
}
@Benchmark
public void getMatchingContainerBenchMark(BenchmarkState state,
Blackhole bh) {
state.stateMap.getMatchingContainerIDs(OPEN, "BILBO", ONE, STAND_ALONE);
}
@State(Scope.Thread)
public static class BenchmarkState {
public ContainerStateMap stateMap;
public AtomicInteger containerID;
@Setup(Level.Trial)
public void initialize() throws IOException {
stateMap = new ContainerStateMap();
Pipeline pipeline = ContainerTestHelper
.createSingleNodePipeline(UUID.randomUUID().toString());
int currentCount = 1;
for (int x = 1; x < 1000 * 1000; x++) {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.CLOSED)
.setPipeline(null)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(x)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
} catch (SCMException e) {
e.printStackTrace();
}
}
for (int y = currentCount; y < 2000; y++) {
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(null)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(y)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
} catch (SCMException e) {
e.printStackTrace();
}
}
try {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(null)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(currentCount++)
.build();
stateMap.addContainer(containerInfo);
} catch (SCMException e) {
e.printStackTrace();
}
containerID = new AtomicInteger(currentCount++);
}
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.List;
public class TestContainerAttribute {
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void testInsert() throws SCMException {
ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>();
ContainerID id = new ContainerID(42);
containerAttribute.insert(1, id);
Assert.assertEquals(1,
containerAttribute.getCollection(1).size());
Assert.assertTrue(containerAttribute.getCollection(1).contains(id));
// Insert again and verify that it overwrites an existing value.
ContainerID newId =
new ContainerID(42);
containerAttribute.insert(1, newId);
Assert.assertEquals(1,
containerAttribute.getCollection(1).size());
Assert.assertTrue(containerAttribute.getCollection(1).contains(newId));
}
@Test
public void testHasKey() throws SCMException {
ContainerAttribute<Integer> containerAttribute = new ContainerAttribute<>();
for (int x = 1; x < 42; x++) {
containerAttribute.insert(1, new ContainerID(x));
}
Assert.assertTrue(containerAttribute.hasKey(1));
for (int x = 1; x < 42; x++) {
Assert.assertTrue(containerAttribute.hasContainerID(1, x));
}
Assert.assertFalse(containerAttribute.hasContainerID(1,
new ContainerID(42)));
}
@Test
public void testClearSet() throws SCMException {
List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3");
ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
for (String k : keyslist) {
for (int x = 1; x < 101; x++) {
containerAttribute.insert(k, new ContainerID(x));
}
}
for (String k : keyslist) {
Assert.assertEquals(100,
containerAttribute.getCollection(k).size());
}
containerAttribute.clearSet("Key1");
Assert.assertEquals(0,
containerAttribute.getCollection("Key1").size());
}
@Test
public void testRemove() throws SCMException {
List<String> keyslist = Arrays.asList("Key1", "Key2", "Key3");
ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
for (String k : keyslist) {
for (int x = 1; x < 101; x++) {
containerAttribute.insert(k, new ContainerID(x));
}
}
for (int x = 1; x < 101; x += 2) {
containerAttribute.remove("Key1", new ContainerID(x));
}
for (int x = 1; x < 101; x += 2) {
Assert.assertFalse(containerAttribute.hasContainerID("Key1",
new ContainerID(x)));
}
Assert.assertEquals(100,
containerAttribute.getCollection("Key2").size());
Assert.assertEquals(100,
containerAttribute.getCollection("Key3").size());
Assert.assertEquals(50,
containerAttribute.getCollection("Key1").size());
}
@Test
public void tesUpdate() throws SCMException {
String key1 = "Key1";
String key2 = "Key2";
String key3 = "Key3";
ContainerAttribute<String> containerAttribute = new ContainerAttribute<>();
ContainerID id = new ContainerID(42);
containerAttribute.insert(key1, id);
Assert.assertTrue(containerAttribute.hasContainerID(key1, id));
Assert.assertFalse(containerAttribute.hasContainerID(key2, id));
// This should move the id from key1 bucket to key2 bucket.
containerAttribute.update(key1, key2, id);
Assert.assertFalse(containerAttribute.hasContainerID(key1, id));
Assert.assertTrue(containerAttribute.hasContainerID(key2, id));
// This should fail since we cannot find this id in the key3 bucket.
thrown.expect(SCMException.class);
containerAttribute.update(key3, key1, id);
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.hadoop.ozone.scm.container.ContainerStates;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.SortedSet;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType.STAND_ALONE;
public class TestContainerStateMap {
@Test
public void testLifeCyleStates() throws IOException {
ContainerStateMap stateMap = new ContainerStateMap();
int currentCount = 1;
Pipeline pipeline = ContainerTestHelper
.createSingleNodePipeline(UUID.randomUUID().toString());
for (int x = 1; x < 1001; x++) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(x)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
SortedSet<ContainerID> openSet = stateMap.getMatchingContainerIDs(OPEN,
"OZONE", ONE, STAND_ALONE);
Assert.assertEquals(1000, openSet.size());
int nextMax = currentCount + 1000;
for (int y = currentCount; y < nextMax; y++) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.CLOSED)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(y)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
openSet = stateMap.getMatchingContainerIDs(OPEN, "OZONE",
ONE, STAND_ALONE);
SortedSet<ContainerID> closeSet = stateMap.getMatchingContainerIDs(CLOSED,
"OZONE", ONE, STAND_ALONE);
// Assert that open is still 1000 and we added 1000 more closed containers.
Assert.assertEquals(1000, openSet.size());
Assert.assertEquals(1000, closeSet.size());
SortedSet<ContainerID> ownerSet = stateMap.getContainerIDsByOwner("OZONE");
// Ozone owns 1000 open and 1000 closed containers.
Assert.assertEquals(2000, ownerSet.size());
}
@Test
public void testGetMatchingContainers() throws IOException {
ContainerStateMap stateMap = new ContainerStateMap();
Pipeline pipeline = ContainerTestHelper
.createSingleNodePipeline(UUID.randomUUID().toString());
int currentCount = 1;
for (int x = 1; x < 1001; x++) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(x)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
SortedSet<ContainerID> openSet = stateMap.getMatchingContainerIDs(OPEN,
"OZONE", ONE, STAND_ALONE);
Assert.assertEquals(1000, openSet.size());
int nextMax = currentCount + 200;
for (int y = currentCount; y < nextMax; y++) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.CLOSED)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(y)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
nextMax = currentCount + 30000;
for (int z = currentCount; z < nextMax; z++) {
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(z)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
// At this point, if we get all Open Containers that belong to Ozone,
// with one replica and standalone replica strategy -- we should get
// 1000 + 30000.
openSet = stateMap.getMatchingContainerIDs(OPEN,
"OZONE", ONE, STAND_ALONE);
Assert.assertEquals(1000 + 30000, openSet.size());
// There is no such owner, so should be a set of zero size.
SortedSet<ContainerID> zeroSet = stateMap.getMatchingContainerIDs(OPEN,
"BILBO", ONE, STAND_ALONE);
Assert.assertEquals(0, zeroSet.size());
int nextId = currentCount++;
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("BILBO")
.setContainerID(nextId)
.build();
stateMap.addContainer(containerInfo);
zeroSet = stateMap.getMatchingContainerIDs(OPEN,
"BILBO", ONE, STAND_ALONE);
Assert.assertEquals(1, zeroSet.size());
// Assert that the container we got back is the nextID itself.
Assert.assertTrue(zeroSet.contains(new ContainerID(nextId)));
}
@Test
public void testUpdateState() throws IOException {
ContainerStateMap stateMap = new ContainerStateMap();
Pipeline pipeline = ContainerTestHelper
.createSingleNodePipeline(UUID.randomUUID().toString());
ContainerInfo containerInfo = null;
int currentCount = 1;
for (int x = 1; x < 1001; x++) {
containerInfo = new ContainerInfo.Builder()
.setContainerName(pipeline.getContainerName())
.setState(OzoneProtos.LifeCycleState.OPEN)
.setPipeline(pipeline)
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner("OZONE")
.setContainerID(x)
.build();
stateMap.addContainer(containerInfo);
currentCount++;
}
stateMap.updateState(containerInfo, OPEN, CLOSED);
SortedSet<ContainerID> closedSet = stateMap.getMatchingContainerIDs(CLOSED,
"OZONE", ONE, STAND_ALONE);
Assert.assertEquals(1, closedSet.size());
Assert.assertTrue(closedSet.contains(containerInfo.containerID()));
SortedSet<ContainerID> openSet = stateMap.getMatchingContainerIDs(OPEN,
"OZONE", ONE, STAND_ALONE);
Assert.assertEquals(999, openSet.size());
}
}

View File

@ -26,6 +26,7 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
@ -43,11 +44,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Tests for Container Mapping.
@ -187,15 +188,13 @@ public void testContainerCreationLeaseTimeout() throws IOException,
OzoneProtos.LifeCycleEvent.CREATE);
Thread.sleep(TIMEOUT + 1000);
List<ContainerInfo> deleteContainers = mapping.getStateManager()
.getMatchingContainers(
containerInfo.getOwner(),
NavigableSet<ContainerID> deleteContainers = mapping.getStateManager()
.getMatchingContainerIDs(
"OZONE",
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING);
Assert.assertTrue(deleteContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
Assert.assertTrue(deleteContainers.contains(containerInfo.containerID()));
thrown.expect(IOException.class);
thrown.expectMessage("Lease Exception");
@ -207,7 +206,7 @@ public void testContainerCreationLeaseTimeout() throws IOException,
public void testFullContainerReport() throws IOException,
InterruptedException {
String containerName = UUID.randomUUID().toString();
createContainer(containerName);
ContainerInfo info = createContainer(containerName);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
ContainerReportsRequestProto.reportType reportType =
ContainerReportsRequestProto.reportType.fullReport;
@ -224,7 +223,8 @@ public void testFullContainerReport() throws IOException,
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L);
.setWriteBytes(2000000000L)
.setContainerID(info.getContainerID());
reports.add(ciBuilder.build());
mapping.processContainerReports(datanodeID, reportType, reports);
@ -237,7 +237,7 @@ public void testFullContainerReport() throws IOException,
@Test
public void testContainerCloseWithContainerReport() throws IOException {
String containerName = UUID.randomUUID().toString();
createContainer(containerName);
ContainerInfo info = createContainer(containerName);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
ContainerReportsRequestProto.reportType reportType =
ContainerReportsRequestProto.reportType.fullReport;
@ -255,7 +255,8 @@ public void testContainerCloseWithContainerReport() throws IOException {
.setReadCount(500000000L)
.setWriteCount(500000000L)
.setReadBytes(5368705120L)
.setWriteBytes(5368705120L);
.setWriteBytes(5368705120L)
.setContainerID(info.getContainerID());
reports.add(ciBuilder.build());
@ -264,43 +265,38 @@ public void testContainerCloseWithContainerReport() throws IOException {
ContainerInfo updatedContainer = mapping.getContainer(containerName);
Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());
Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes());
List<ContainerInfo> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainers(
NavigableSet<ContainerID> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainerIDs(
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING);
Assert.assertTrue(pendingCloseContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
Assert.assertTrue(
pendingCloseContainers.contains(updatedContainer.containerID()));
}
@Test
public void testCloseContainer() throws IOException {
String containerName = UUID.randomUUID().toString();
createContainer(containerName);
ContainerInfo info = createContainer(containerName);
mapping.updateContainerState(containerName,
OzoneProtos.LifeCycleEvent.FINALIZE);
List<ContainerInfo> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainers(
NavigableSet<ContainerID> pendingCloseContainers = mapping.getStateManager()
.getMatchingContainerIDs(
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING);
Assert.assertTrue(pendingCloseContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
Assert.assertTrue(pendingCloseContainers.contains(info.containerID()));
mapping.updateContainerState(containerName,
OzoneProtos.LifeCycleEvent.CLOSE);
List<ContainerInfo> closeContainers = mapping.getStateManager()
.getMatchingContainers(
NavigableSet<ContainerID> closeContainers = mapping.getStateManager()
.getMatchingContainerIDs(
containerOwner,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED);
Assert.assertTrue(closeContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
Assert.assertTrue(closeContainers.contains(info.containerID()));
}
/**
@ -309,7 +305,8 @@ public void testCloseContainer() throws IOException {
* Name of the container
* @throws IOException
*/
private void createContainer(String containerName) throws IOException {
private ContainerInfo createContainer(String containerName)
throws IOException {
nodeManager.setChillmode(false);
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
@ -320,6 +317,7 @@ private void createContainer(String containerName) throws IOException {
OzoneProtos.LifeCycleEvent.CREATE);
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.CREATED);
return containerInfo;
}
}

View File

@ -23,17 +23,17 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
import java.nio.charset.Charset;
import java.util.NavigableSet;
import java.util.Random;
/**
@ -49,8 +49,6 @@ public class TestContainerStateManager {
private ContainerStateManager stateManager;
private String containerOwner = "OZONE";
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void setup() throws IOException {
@ -95,7 +93,7 @@ public void testAllocateContainer() throws IOException {
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2, containerOwner);
int numContainers = stateManager
.getMatchingContainers(containerOwner,
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(2, numContainers);
@ -117,14 +115,14 @@ public void testContainerStateManagerRestart() throws IOException {
// New instance of ContainerStateManager should load all the containers in
// container store.
ContainerStateManager stateManager =
new ContainerStateManager(conf, scmContainerMapping,
128 * OzoneConsts.MB);
new ContainerStateManager(conf, scmContainerMapping
);
int containers = stateManager
.getMatchingContainers(containerOwner,
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(5, containers);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(5, containers);
@ -132,7 +130,7 @@ public void testContainerStateManagerRestart() throws IOException {
@Test
public void testGetMatchingContainer() throws IOException {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
String container1 = "container-01234";
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
scmContainerMapping.updateContainerState(container1,
@ -140,7 +138,7 @@ public void testGetMatchingContainer() throws IOException {
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATED);
String container2 = "container" + RandomStringUtils.randomNumeric(5);
String container2 = "container-56789";
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2, containerOwner);
@ -150,12 +148,6 @@ public void testGetMatchingContainer() throws IOException {
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1, info.getContainerName());
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(null, info);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
@ -166,17 +158,18 @@ public void testGetMatchingContainer() throws IOException {
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.CREATED);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container2, info.getContainerName());
Assert.assertEquals(container1, info.getContainerName());
}
@Test
public void testUpdateContainerState() throws IOException {
List<ContainerInfo> containerList = stateManager
.getMatchingContainers(containerOwner,
NavigableSet<ContainerID> containerList = stateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
int containers = containerList == null ? 0 : containerList.size();
@ -187,49 +180,49 @@ public void testUpdateContainerState() throws IOException {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1, containerOwner);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATE);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.CREATED);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.FINALIZE);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSING).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLOSE);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.DELETE);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLEANUP);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETED).size();
Assert.assertEquals(1, containers);
@ -243,7 +236,7 @@ public void testUpdateContainerState() throws IOException {
OzoneProtos.LifeCycleEvent.CREATE);
scmContainerMapping
.updateContainerState(container2, OzoneProtos.LifeCycleEvent.TIMEOUT);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
@ -261,7 +254,7 @@ public void testUpdateContainerState() throws IOException {
OzoneProtos.LifeCycleEvent.FINALIZE);
scmContainerMapping
.updateContainerState(container3, OzoneProtos.LifeCycleEvent.CLOSE);
containers = stateManager.getMatchingContainers(containerOwner,
containers = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
@ -295,9 +288,10 @@ public void testUpdatingAllocatedBytes() throws Exception {
// to disk
containerMapping.flushContainerInfo();
Charset utf8 = Charset.forName("UTF-8");
// the persisted value should always be equal to allocated size.
byte[] containerBytes =
containerMapping.getContainerStore().get(container1.getBytes());
containerMapping.getContainerStore().get(container1.getBytes(utf8));
OzoneProtos.SCMContainerInfo infoProto =
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);