HDFS-12474. Ozone: SCM: Handling container report with key count and container usage. Contributed by Nanda kumar.

This commit is contained in:
Xiaoyu Yao 2017-11-06 15:52:03 -08:00 committed by Owen O'Malley
parent 2ec24a7542
commit 709d56fdc9
14 changed files with 469 additions and 274 deletions

View File

@ -223,6 +223,12 @@ public final class ScmConfigKeys {
"ozone.scm.block.deletion.max.retry";
public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
// Once a container usage crosses this threshold, it is eligible for
// closing.
public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD =
"ozone.scm.container.close.threshold";
public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
/**
* Never constructed.
*/

View File

@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.util.Time;
import java.io.Serializable;
import java.util.Comparator;
/**
* Manages Block Information inside a container.
*/
public class BlockContainerInfo extends ContainerInfo
implements Comparator<BlockContainerInfo>,
Comparable<BlockContainerInfo>, Serializable {
private long allocated;
private long lastUsed; // last used time
public BlockContainerInfo(ContainerInfo container, long used) {
super(container);
this.allocated = used;
this.lastUsed = Time.monotonicNow();
}
public long addAllocated(long size) {
allocated += size;
return allocated;
}
public long subtractAllocated(long size) {
allocated -= size;
return allocated;
}
public long getAllocated() {
return this.allocated;
}
/**
* Gets the last used time from SCM's perspective.
* @return time in milliseconds.
*/
public long getLastUsed() {
return lastUsed;
}
/**
* Sets the last used time from SCM's perspective.
* @param lastUsed time in milliseconds.
*/
public void setLastUsed(long lastUsed) {
this.lastUsed = lastUsed;
}
/**
* Compares its two arguments for order. Returns a negative integer, zero, or
* a positive integer as the first argument is less than, equal to, or greater
* than the second.<p>
*
* @param o1 the first object to be compared.
* @param o2 the second object to be compared.
* @return a negative integer, zero, or a positive integer as the first
* argument is less than, equal to, or greater than the second.
* @throws NullPointerException if an argument is null and this comparator
* does not permit null arguments
* @throws ClassCastException if the arguments' types prevent them from
* being compared by this comparator.
*/
@Override
public int compare(BlockContainerInfo o1, BlockContainerInfo o2) {
return Long.compare(o1.getLastUsed(), o2.getLastUsed());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return new EqualsBuilder()
.appendSuper(super.equals(o))
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 2017)
.appendSuper(super.hashCode())
.toHashCode();
}
@Override
public String toString() {
return "BlockContainerInfo{" +
"allocated=" + allocated +
", lastUsed=" + lastUsed +
", ContainerInfo=" + super.toString() + '}';
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less than,
* equal to, or greater than the specified object.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object is
* less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(BlockContainerInfo o) {
return this.compare(this, o);
}
public boolean canAllocate(long size, long containerSize) {
//TODO: move container size inside Container Info
return ((getState() == OzoneProtos.LifeCycleState.ALLOCATED ||
getState() == OzoneProtos.LifeCycleState.OPEN) &&
(getAllocated() + size <= containerSize));
}
}

View File

@ -23,10 +23,19 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.util.Time;
import java.util.Comparator;
/** Class wraps ozone container info. */
public class ContainerInfo {
public class ContainerInfo
implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
private OzoneProtos.LifeCycleState state;
private Pipeline pipeline;
// Bytes allocated by SCM for clients.
private long allocatedBytes;
// Actual container usage, updated through heartbeat.
private long usedBytes;
private long numberOfKeys;
private long lastUsed;
// The wall-clock ms since the epoch at which the current state enters.
private long stateEnterTime;
private OzoneProtos.Owner owner;
@ -36,23 +45,22 @@ public class ContainerInfo {
final String containerName,
OzoneProtos.LifeCycleState state,
Pipeline pipeline,
long allocatedBytes,
long usedBytes,
long numberOfKeys,
long stateEnterTime,
OzoneProtos.Owner owner) {
this.containerName = containerName;
this.pipeline = pipeline;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
this.numberOfKeys = numberOfKeys;
this.lastUsed = Time.monotonicNow();
this.state = state;
this.stateEnterTime = stateEnterTime;
this.owner = owner;
}
public ContainerInfo(ContainerInfo container) {
this.pipeline = container.getPipeline();
this.state = container.getState();
this.containerName = container.getContainerName();
this.stateEnterTime = container.getStateEnterTime();
this.owner = container.getOwner();
}
/**
* Needed for serialization findbugs.
*/
@ -62,6 +70,9 @@ public class ContainerInfo {
public static ContainerInfo fromProtobuf(OzoneProtos.SCMContainerInfo info) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
builder.setAllocatedBytes(info.getAllocatedBytes());
builder.setUsedBytes(info.getUsedBytes());
builder.setNumberOfKeys(info.getNumberOfKeys());
builder.setState(info.getState());
builder.setStateEnterTime(info.getStateEnterTime());
builder.setOwner(info.getOwner());
@ -73,24 +84,10 @@ public class ContainerInfo {
return containerName;
}
public void setContainerName(String containerName) {
this.containerName = containerName;
}
public OzoneProtos.LifeCycleState getState() {
return state;
}
/**
* Update the current container state and state enter time to now.
*
* @param state
*/
public void setState(OzoneProtos.LifeCycleState state) {
this.state = state;
this.stateEnterTime = Time.monotonicNow();
}
public long getStateEnterTime() {
return stateEnterTime;
}
@ -99,10 +96,43 @@ public class ContainerInfo {
return pipeline;
}
public long getAllocatedBytes() {
return allocatedBytes;
}
public long getUsedBytes() {
return usedBytes;
}
public long getNumberOfKeys() {
return numberOfKeys;
}
/**
* Gets the last used time from SCM's perspective.
* @return time in milliseconds.
*/
public long getLastUsed() {
return lastUsed;
}
public void updateLastUsedTime() {
lastUsed = Time.monotonicNow();
}
public void allocate(long size) {
// should we also have total container size in ContainerInfo
// and check before allocating?
allocatedBytes += size;
}
public OzoneProtos.SCMContainerInfo getProtobuf() {
OzoneProtos.SCMContainerInfo.Builder builder =
OzoneProtos.SCMContainerInfo.newBuilder();
builder.setPipeline(getPipeline().getProtobufMessage());
builder.setAllocatedBytes(getAllocatedBytes());
builder.setUsedBytes(getUsedBytes());
builder.setNumberOfKeys(getNumberOfKeys());
builder.setState(state);
builder.setStateEnterTime(stateEnterTime);
@ -145,7 +175,6 @@ public class ContainerInfo {
ContainerInfo that = (ContainerInfo) o;
return new EqualsBuilder()
.append(state, that.state)
.append(pipeline.getContainerName(), that.pipeline.getContainerName())
// TODO : Fix this later. If we add these factors some tests fail.
@ -168,10 +197,49 @@ public class ContainerInfo {
.toHashCode();
}
/**
* Compares its two arguments for order. Returns a negative integer, zero, or
* a positive integer as the first argument is less than, equal to, or greater
* than the second.<p>
*
* @param o1 the first object to be compared.
* @param o2 the second object to be compared.
* @return a negative integer, zero, or a positive integer as the first
* argument is less than, equal to, or greater than the second.
* @throws NullPointerException if an argument is null and this comparator
* does not permit null arguments
* @throws ClassCastException if the arguments' types prevent them from
* being compared by this comparator.
*/
@Override
public int compare(ContainerInfo o1, ContainerInfo o2) {
return Long.compare(o1.getLastUsed(), o2.getLastUsed());
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less than,
* equal to, or greater than the specified object.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object is
* less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(ContainerInfo o) {
return this.compare(this, o);
}
/** Builder class for ContainerInfo. */
public static class Builder {
private OzoneProtos.LifeCycleState state;
private Pipeline pipeline;
private long allocated;
private long used;
private long keys;
private long stateEnterTime;
private OzoneProtos.Owner owner;
private String containerName;
@ -186,6 +254,21 @@ public class ContainerInfo {
return this;
}
public Builder setAllocatedBytes(long bytesAllocated) {
this.allocated = bytesAllocated;
return this;
}
public Builder setUsedBytes(long bytesUsed) {
this.used = bytesUsed;
return this;
}
public Builder setNumberOfKeys(long keyCount) {
this.keys = keyCount;
return this;
}
public Builder setStateEnterTime(long stateEnterTime) {
this.stateEnterTime = stateEnterTime;
return this;
@ -203,7 +286,8 @@ public class ContainerInfo {
public ContainerInfo build() {
return new
ContainerInfo(containerName, state, pipeline, stateEnterTime, owner);
ContainerInfo(containerName, state, pipeline, allocated, used,
keys, stateEnterTime, owner);
}
}
}

View File

@ -114,8 +114,13 @@ message SCMContainerInfo {
required string containerName = 1;
required LifeCycleState state = 2;
required Pipeline pipeline = 3;
optional int64 stateEnterTime = 4;
optional Owner owner = 5 [default = OZONE];
// This is not total size of container, but space allocated by SCM for
// clients to write blocks
required uint64 allocatedBytes = 4;
required uint64 usedBytes = 5;
required uint64 numberOfKeys = 6;
optional int64 stateEnterTime = 7;
optional Owner owner = 8 [default = OZONE];
}
enum ReplicationType {

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import java.util.List;

View File

@ -787,8 +787,10 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
metrics.setLastContainerReportWriteCount(stat.getWriteCount().get());
}
// TODO: handle the container reports either here or add container report
// handler.
// should we process container reports async?
scmContainerManager.processContainerReports(
DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
reports.getType(), reports.getReportsList());
return ContainerReportsResponseProto.newBuilder().build();
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
@ -260,7 +259,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
can use different kind of policies.
*/
BlockContainerInfo containerInfo = null;
ContainerInfo containerInfo;
// Look for ALLOCATED container that matches all other parameters.
containerInfo =
@ -327,7 +326,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
* @return AllocatedBlock
*/
private AllocatedBlock newBlock(
BlockContainerInfo containerInfo, OzoneProtos.LifeCycleState state)
ContainerInfo containerInfo, OzoneProtos.LifeCycleState state)
throws IOException {
// TODO : Replace this with Block ID.

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
@ -27,12 +28,14 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
@ -69,6 +72,7 @@ public class ContainerMapping implements Mapping {
private final PipelineSelector pipelineSelector;
private final ContainerStateManager containerStateManager;
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final float containerCloseThreshold;
/**
* Constructs a mapping class that creates mapping between container names
@ -108,6 +112,9 @@ public class ContainerMapping implements Mapping {
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
this.containerStateManager =
new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
LOG.trace("Container State Manager created.");
long containerCreationLeaseTimeout = conf.getLong(
@ -293,8 +300,7 @@ public class ContainerMapping implements Mapping {
containerLeaseManager.acquire(containerInfo);
// Register callback to be executed in case of timeout
containerLease.registerCallBack(() -> {
containerStateManager.updateContainerState(
new BlockContainerInfo(containerInfo, 0),
updateContainerState(containerName,
OzoneProtos.LifeCycleEvent.TIMEOUT);
return null;
});
@ -302,12 +308,9 @@ public class ContainerMapping implements Mapping {
// Release the lease on container
containerLeaseManager.release(containerInfo);
}
// TODO: Actual used will be updated via Container Reports later.
containerInfo.setState(
containerStateManager.updateContainerState(
new BlockContainerInfo(containerInfo, 0), event));
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
ContainerInfo updatedContainer = containerStateManager
.updateContainerState(containerInfo, event);
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return containerInfo.getState();
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
@ -323,6 +326,85 @@ public class ContainerMapping implements Mapping {
return containerStateManager;
}
/**
* Process container report from Datanode.
*
* @param datanodeID Datanode ID
* @param reportType Type of report
* @param containerInfos container details
*/
@Override
public void processContainerReports(
DatanodeID datanodeID,
ContainerReportsRequestProto.reportType reportType,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos) throws IOException {
for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo :
containerInfos) {
byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray();
lock.lock();
try {
byte[] containerBytes = containerStore.get(dbKey);
if (containerBytes != null) {
OzoneProtos.SCMContainerInfo oldInfo =
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
OzoneProtos.SCMContainerInfo.Builder builder =
OzoneProtos.SCMContainerInfo.newBuilder();
builder.setContainerName(oldInfo.getContainerName());
builder.setPipeline(oldInfo.getPipeline());
// If used size is greater than allocated size, we will be updating
// allocated size with used size. This update is done as a fallback
// mechanism in case SCM crashes without properly updating allocated
// size. Correct allocated value will be updated by
// ContainerStateManager during SCM shutdown.
long usedSize = containerInfo.getUsed();
long allocated = oldInfo.getAllocatedBytes() > usedSize ?
oldInfo.getAllocatedBytes() : usedSize;
builder.setAllocatedBytes(allocated);
builder.setUsedBytes(containerInfo.getUsed());
builder.setNumberOfKeys(containerInfo.getKeyCount());
builder.setState(oldInfo.getState());
builder.setStateEnterTime(oldInfo.getStateEnterTime());
if (oldInfo.getOwner() != null) {
builder.setOwner(oldInfo.getOwner());
}
OzoneProtos.SCMContainerInfo newContainerInfo = builder.build();
containerStore.put(dbKey, newContainerInfo.toByteArray());
float containerUsedPercentage = 1.0f *
containerInfo.getUsed() / containerInfo.getSize();
// TODO: Handling of containers which are already in close queue.
if (containerUsedPercentage >= containerCloseThreshold) {
// TODO: The container has to be moved to close container queue.
// For now, we are just updating the container state to CLOSED.
// Close container implementation can decide on how to maintain
// list of containers to be closed, this is the place where we
// have to add the containers to that list.
ContainerInfo updatedContainer =
containerStateManager.updateContainerState(
ContainerInfo.fromProtobuf(newContainerInfo),
OzoneProtos.LifeCycleEvent.CLOSE);
if (updatedContainer.getState() !=
OzoneProtos.LifeCycleState.CLOSED) {
LOG.error("Failed to close container {}, reason : Not able to " +
"update container state, current container state: {}." +
"in state {}", containerInfo.getContainerName(),
updatedContainer.getState());
}
}
} else {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
"container database.");
}
} finally {
lock.unlock();
}
}
}
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is

View File

@ -30,13 +30,13 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@ -57,7 +57,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
import static org.apache.hadoop.ozone.scm.exceptions
.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* A container state manager keeps track of container states and returns
@ -116,7 +117,7 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FA
* TimeOut Delete Container State Machine - if the container creating times out,
* then Container State manager decides to delete the container.
*/
public class ContainerStateManager {
public class ContainerStateManager implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStateManager.class);
@ -130,8 +131,8 @@ public class ContainerStateManager {
// A map that maintains the ContainerKey to Containers of that type ordered
// by last access time.
private final ReadWriteLock lock;
private final Queue<BlockContainerInfo> containerCloseQueue;
private Map<ContainerKey, PriorityQueue<BlockContainerInfo>> containers;
private final Queue<ContainerInfo> containerCloseQueue;
private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers;
/**
* Constructs a Container State Manager that tracks all containers owned by
@ -167,7 +168,7 @@ public class ContainerStateManager {
containers = new HashMap<>();
initializeContainerMaps();
loadExistingContainers(containerMapping);
containerCloseQueue = new ConcurrentLinkedQueue<BlockContainerInfo>();
containerCloseQueue = new ConcurrentLinkedQueue<>();
}
/**
@ -191,7 +192,7 @@ public class ContainerStateManager {
for (ReplicationFactor factor : ReplicationFactor.values()) {
for (LifeCycleState state : LifeCycleState.values()) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
PriorityQueue<BlockContainerInfo> queue = new PriorityQueue<>();
PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
containers.put(key, queue);
}
}
@ -212,9 +213,7 @@ public class ContainerStateManager {
ContainerKey key = new ContainerKey(container.getOwner(),
container.getPipeline().getType(),
container.getPipeline().getFactor(), container.getState());
BlockContainerInfo blockContainerInfo =
new BlockContainerInfo(container, 0);
((PriorityQueue) containers.get(key)).add(blockContainerInfo);
containers.get(key).add(container);
}
} catch (IOException e) {
if (!e.getMessage().equals("No container exists in current db")) {
@ -289,28 +288,31 @@ public class ContainerStateManager {
Pipeline pipeline = selector.getReplicationPipeline(type,
replicationFactor, containerName);
ContainerInfo info = new ContainerInfo.Builder()
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(containerName)
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
.setPipeline(pipeline)
// This is bytes allocated for blocks inside container, not the
// container size
.setAllocatedBytes(0)
.setUsedBytes(0)
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
.build();
Preconditions.checkNotNull(info);
BlockContainerInfo blockInfo = new BlockContainerInfo(info, 0);
blockInfo.setLastUsed(Time.monotonicNow());
Preconditions.checkNotNull(containerInfo);
lock.writeLock().lock();
try {
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
blockInfo.getState());
PriorityQueue<BlockContainerInfo> queue = containers.get(key);
containerInfo.getState());
PriorityQueue<ContainerInfo> queue = containers.get(key);
Preconditions.checkNotNull(queue);
queue.add(blockInfo);
LOG.trace("New container allocated: {}", blockInfo);
queue.add(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
} finally {
lock.writeLock().unlock();
}
return info;
return containerInfo;
}
/**
@ -318,20 +320,14 @@ public class ContainerStateManager {
*
* @param info - ContainerInfo
* @param event - LifeCycle Event
* @return New state of the container.
* @return Updated ContainerInfo.
* @throws SCMException
*/
public OzoneProtos.LifeCycleState updateContainerState(BlockContainerInfo
public ContainerInfo updateContainerState(ContainerInfo
info, OzoneProtos.LifeCycleEvent event) throws SCMException {
LifeCycleState newState = null;
boolean shouldLease = false;
LifeCycleState newState;
try {
newState = this.stateMachine.getNextState(info.getState(), event);
if(newState == LifeCycleState.CREATING) {
// if we are moving into a Creating State, it is possible that clients
// could timeout therefore we need to use a lease.
shouldLease = true;
}
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
"reason: invalid state transition from state: %s upon event: %s.",
@ -352,7 +348,7 @@ public class ContainerStateManager {
lock.writeLock().lock();
try {
PriorityQueue<BlockContainerInfo> currentQueue = containers.get(oldKey);
PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey);
// This should never happen, since we have initialized the map and
// queues to all possible states. No harm in asserting that info.
Preconditions.checkNotNull(currentQueue);
@ -368,14 +364,23 @@ public class ContainerStateManager {
currentQueue.remove(info);
}
info.setState(newState);
PriorityQueue<BlockContainerInfo> nextQueue = containers.get(newKey);
PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey);
Preconditions.checkNotNull(nextQueue);
info.setLastUsed(Time.monotonicNow());
nextQueue.add(info);
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerName(info.getContainerName())
.setState(newState)
.setPipeline(info.getPipeline())
.setAllocatedBytes(info.getAllocatedBytes())
.setUsedBytes(info.getUsedBytes())
.setNumberOfKeys(info.getNumberOfKeys())
.setStateEnterTime(Time.monotonicNow())
.setOwner(info.getOwner())
.build();
Preconditions.checkNotNull(containerInfo);
nextQueue.add(containerInfo);
return newState;
return containerInfo;
} finally {
lock.writeLock().unlock();
}
@ -389,43 +394,34 @@ public class ContainerStateManager {
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
* @return BlockContainerInfo
* @return ContainerInfo
*/
public BlockContainerInfo getMatchingContainer(final long size,
public ContainerInfo getMatchingContainer(final long size,
Owner owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.writeLock().lock();
try {
PriorityQueue<BlockContainerInfo> queue = containers.get(key);
PriorityQueue<ContainerInfo> queue = containers.get(key);
if (queue.size() == 0) {
// We don't have any Containers of this type.
return null;
}
Iterator<BlockContainerInfo> iter = queue.iterator();
Iterator<ContainerInfo> iter = queue.iterator();
// Two assumptions here.
// 1. The Iteration on the heap is in ordered by the last used time.
// 2. We remove and add the node back to push the node to the end of
// the queue.
while (iter.hasNext()) {
BlockContainerInfo info = iter.next();
if (info.canAllocate(size, this.containerSize)) {
ContainerInfo info = iter.next();
if (info.getAllocatedBytes() + size <= this.containerSize) {
queue.remove(info);
info.addAllocated(size);
info.setLastUsed(Time.monotonicNow());
info.allocate(size);
info.updateLastUsedTime();
queue.add(info);
return info;
} else {
if (info.getState() != LifeCycleState.CLOSED) {
// We should close this container.
LOG.info("Moving {} to containerCloseQueue.", info.toString());
info.setState(LifeCycleState.CLOSED);
containerCloseQueue.add(info);
//TODO: Next JIRA will handle these containers to close.
//TODO: move container to right queue
}
}
}
@ -436,13 +432,13 @@ public class ContainerStateManager {
}
@VisibleForTesting
public List<BlockContainerInfo> getMatchingContainers(Owner owner,
public List<ContainerInfo> getMatchingContainers(Owner owner,
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.readLock().lock();
try {
return Arrays.asList((BlockContainerInfo[]) containers.get(key)
.toArray(new BlockContainerInfo[0]));
return Arrays.asList((ContainerInfo[]) containers.get(key)
.toArray(new ContainerInfo[0]));
} catch (Exception e) {
LOG.error("Could not get matching containers", e);
} finally {
@ -451,6 +447,11 @@ public class ContainerStateManager {
return null;
}
@Override
public void close() throws IOException {
//TODO: update container metadata db with actual allocated bytes values.
}
/**
* Class that acts as the container Key.
*/

View File

@ -17,7 +17,12 @@
package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import java.io.Closeable;
@ -101,4 +106,18 @@ public interface Mapping extends Closeable {
* @return ContainerStateManager
*/
ContainerStateManager getStateManager();
/**
* Process container report from Datanode.
*
* @param datanodeID Datanode ID
* @param reportType Type of report
* @param containerInfos container details
*/
void processContainerReports(
DatanodeID datanodeID,
ContainerReportsRequestProto.reportType reportType,
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containerInfos) throws IOException;
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
@ -578,56 +579,64 @@ public class SCMNodeManager
private void handleHeartbeat(HeartbeatQueueItem hbItem) {
lastHBProcessedCount++;
String datanodeID = hbItem.getDatanodeID().getDatanodeUuid();
DatanodeID datanodeID = hbItem.getDatanodeID();
String datanodeUuid = datanodeID.getDatanodeUuid();
SCMNodeReport nodeReport = hbItem.getNodeReport();
long recvTimestamp = hbItem.getRecvTimestamp();
long processTimestamp = Time.monotonicNow();
if (LOG.isTraceEnabled()) {
//TODO: add average queue time of heartbeat request as metrics
LOG.trace("Processing Heartbeat from datanode {}: queueing time {}",
datanodeID, processTimestamp - recvTimestamp);
datanodeUuid, processTimestamp - recvTimestamp);
}
// If this node is already in the list of known and healthy nodes
// just set the last timestamp and return.
if (healthyNodes.containsKey(datanodeID)) {
healthyNodes.put(datanodeID, processTimestamp);
updateNodeStat(datanodeID, nodeReport);
if (healthyNodes.containsKey(datanodeUuid)) {
healthyNodes.put(datanodeUuid, processTimestamp);
updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeID,
hbItem.getContainerReportState().getState());
return;
}
// A stale node has heartbeat us we need to remove the node from stale
// list and move to healthy list.
if (staleNodes.containsKey(datanodeID)) {
staleNodes.remove(datanodeID);
healthyNodes.put(datanodeID, processTimestamp);
if (staleNodes.containsKey(datanodeUuid)) {
staleNodes.remove(datanodeUuid);
healthyNodes.put(datanodeUuid, processTimestamp);
healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet();
updateNodeStat(datanodeID, nodeReport);
updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeID,
hbItem.getContainerReportState().getState());
return;
}
// A dead node has heartbeat us, we need to remove that node from dead
// node list and move it to the healthy list.
if (deadNodes.containsKey(datanodeID)) {
deadNodes.remove(datanodeID);
healthyNodes.put(datanodeID, processTimestamp);
if (deadNodes.containsKey(datanodeUuid)) {
deadNodes.remove(datanodeUuid);
healthyNodes.put(datanodeUuid, processTimestamp);
deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet();
updateNodeStat(datanodeID, nodeReport);
updateNodeStat(datanodeUuid, nodeReport);
updateCommandQueue(datanodeID,
hbItem.getContainerReportState().getState());
return;
}
LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
LOG.warn("SCM receive heartbeat from unregistered datanode {}",
datanodeUuid);
this.commandQueue.addCommand(hbItem.getDatanodeID(),
new ReregisterCommand());
}
private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
SCMNodeStat stat = nodeStats.get(datanodeID);
private void updateNodeStat(String datanodeUuid, SCMNodeReport nodeReport) {
SCMNodeStat stat = nodeStats.get(datanodeUuid);
if (stat == null) {
LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
"dead datanode {}", datanodeID);
"dead datanode {}", datanodeUuid);
stat = new SCMNodeStat();
}
@ -643,11 +652,27 @@ public class SCMNodeManager
}
scmStat.subtract(stat);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
nodeStats.put(datanodeID, stat);
nodeStats.put(datanodeUuid, stat);
scmStat.add(stat);
}
}
private void updateCommandQueue(DatanodeID datanodeID,
ReportState.states containerReportState) {
if (containerReportState != null) {
switch (containerReportState) {
case completeContinerReport:
commandQueue.addCommand(datanodeID,
SendContainerCommand.newBuilder().build());
return;
case deltaContainerReport:
case noContainerReports:
default:
// do nothing
}
}
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.

View File

@ -1126,4 +1126,15 @@
nothing is specified. Supported values: RATIS, STAND_ALONE and CHAINED.
</description>
</property>
<property>
<name>ozone.scm.container.close.threshold</name>
<value>0.9f</value>
<tag>OZONE, SCM</tag>
<description>
This determines the threshold to be used for closing a container.
When the container used percentage reaches this threshold,
the container will be closed. Value should be a positive, non-zero
percentage in float notation (X.Yf), with 1.0f meaning 100%.
</description>
</property>
</configuration>

View File

@ -18,12 +18,16 @@ package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
@ -37,9 +41,12 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Tests for Container Mapping.
@ -176,18 +183,112 @@ public class TestContainerMapping {
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
Thread.sleep(TIMEOUT + 1000);
BlockContainerInfo deletingContainer = mapping.getStateManager()
.getMatchingContainer(
0, containerInfo.getOwner(),
List<ContainerInfo> deleteContainers = mapping.getStateManager()
.getMatchingContainers(
containerInfo.getOwner(),
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING);
Assert.assertEquals(containerInfo.getContainerName(),
deletingContainer.getContainerName());
Assert.assertTrue(deleteContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
thrown.expect(IOException.class);
thrown.expectMessage("Lease Exception");
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
}
@Test
public void testFullContainerReport() throws IOException,
InterruptedException {
String containerName = UUID.randomUUID().toString();
createContainer(containerName);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
ContainerReportsRequestProto.reportType reportType =
ContainerReportsRequestProto.reportType.fullReport;
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
new ArrayList<>();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setContainerName(containerName)
//setting some random hash
.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(5368709120L)
.setUsed(2000000000L)
.setKeyCount(100000000L)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L);
reports.add(ciBuilder.build());
mapping.processContainerReports(datanodeID, reportType, reports);
ContainerInfo updatedContainer = mapping.getContainer(containerName);
Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys());
Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes());
}
@Test
public void testContainerCloseWithContainerReport() throws IOException,
InterruptedException {
String containerName = UUID.randomUUID().toString();
createContainer(containerName);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
ContainerReportsRequestProto.reportType reportType =
ContainerReportsRequestProto.reportType.fullReport;
List<StorageContainerDatanodeProtocolProtos.ContainerInfo> reports =
new ArrayList<>();
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setContainerName(containerName)
//setting some random hash
.setFinalhash("7c45eb4d7ed5e0d2e89aaab7759de02e")
.setSize(5368709120L)
.setUsed(5368705120L)
.setKeyCount(500000000L)
.setReadCount(500000000L)
.setWriteCount(500000000L)
.setReadBytes(5368705120L)
.setWriteBytes(5368705120L);
reports.add(ciBuilder.build());
mapping.processContainerReports(datanodeID, reportType, reports);
ContainerInfo updatedContainer = mapping.getContainer(containerName);
Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());
Assert.assertEquals(5368705120L, updatedContainer.getUsedBytes());
List<ContainerInfo> closeContainers = mapping.getStateManager()
.getMatchingContainers(
OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED);
Assert.assertTrue(closeContainers.stream().map(
container -> container.getContainerName()).collect(
Collectors.toList()).contains(containerName));
}
/**
* Creates a container with the given name in ContainerMapping.
* @param containerName
* Name of the container
* @throws IOException
*/
private void createContainer(String containerName) throws IOException {
nodeManager.setChillmode(false);
ContainerInfo containerInfo = mapping.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerName,
OzoneProtos.Owner.OZONE);
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
mapping.updateContainerState(containerInfo.getContainerName(),
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
}
}

View File

@ -23,8 +23,12 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.junit.*;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
@ -69,12 +73,12 @@ public class TestContainerStateManager {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
BlockContainerInfo info = stateManager
ContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container1, info.getContainerName());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocated());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
Assert.assertEquals(OzoneProtos.Owner.OZONE, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
info.getPipeline().getType());
@ -136,7 +140,7 @@ public class TestContainerStateManager {
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
BlockContainerInfo info = stateManager
ContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);