HDDS-78. Add per volume level storage stats in SCM.

Contributed by  Shashikant Banerjee.
This commit is contained in:
Anu Engineer 2018-05-26 11:06:22 -07:00
parent f24c842d52
commit 0cf6e87f92
5 changed files with 356 additions and 131 deletions

View File

@ -136,25 +136,4 @@ public boolean equals(Object to) {
public int hashCode() { public int hashCode() {
return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get()); return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get());
} }
/**
* Truncate to 4 digits since uncontrolled precision is some times
* counter intuitive to what users expect.
* @param value - double.
* @return double.
*/
private double truncateDecimals(double value) {
final int multiplier = 10000;
return (double) ((long) (value * multiplier)) / multiplier;
}
/**
* get the scmUsed ratio
*/
public double getScmUsedratio() {
double scmUsedRatio =
truncateDecimals(getScmUsed().get() / (double) getCapacity().get());
return scmUsedRatio;
}
} }

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.hdds.scm.node; package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
/** /**
@ -66,4 +68,10 @@ public interface SCMNodeStorageStatMXBean {
* @return long * @return long
*/ */
long getTotalFreeSpace(); long getTotalFreeSpace();
/**
* Returns the set of disks for a given Datanode.
* @return set of storage volumes
*/
Set<StorageLocationReport> getStorageVolumes(UUID datanodeId);
} }

View File

@ -22,18 +22,18 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.util.ArrayList; import java.io.IOException;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -52,16 +52,15 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean {
private final double warningUtilizationThreshold; private final double warningUtilizationThreshold;
private final double criticalUtilizationThreshold; private final double criticalUtilizationThreshold;
private final Map<UUID, SCMNodeStat> scmNodeStorageStatMap; private final Map<UUID, Set<StorageLocationReport>> scmNodeStorageReportMap;
// NodeStorageInfo MXBean // NodeStorageInfo MXBean
private ObjectName scmNodeStorageInfoBean; private ObjectName scmNodeStorageInfoBean;
// Aggregated node stats
private SCMNodeStat clusterStat;
/** /**
* constructs the scmNodeStorageStatMap object * constructs the scmNodeStorageReportMap object
*/ */
public SCMNodeStorageStatMap(OzoneConfiguration conf) { public SCMNodeStorageStatMap(OzoneConfiguration conf) {
scmNodeStorageStatMap = new ConcurrentHashMap<>(); // scmNodeStorageReportMap = new ConcurrentHashMap<>();
scmNodeStorageReportMap = new ConcurrentHashMap<>();
warningUtilizationThreshold = conf.getDouble( warningUtilizationThreshold = conf.getDouble(
OzoneConfigKeys. OzoneConfigKeys.
HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD, HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD,
@ -72,7 +71,6 @@ public SCMNodeStorageStatMap(OzoneConfiguration conf) {
HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD, HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD,
OzoneConfigKeys. OzoneConfigKeys.
HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT); HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT);
clusterStat = new SCMNodeStat();
} }
public enum UtilizationThreshold { public enum UtilizationThreshold {
@ -81,20 +79,22 @@ public enum UtilizationThreshold {
/** /**
* Returns true if this a datanode that is already tracked by * Returns true if this a datanode that is already tracked by
* scmNodeStorageStatMap. * scmNodeStorageReportMap.
* *
* @param datanodeID - UUID of the Datanode. * @param datanodeID - UUID of the Datanode.
* @return True if this is tracked, false if this map does not know about it. * @return True if this is tracked, false if this map does not know about it.
*/ */
public boolean isKnownDatanode(UUID datanodeID) { public boolean isKnownDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(datanodeID);
return scmNodeStorageStatMap.containsKey(datanodeID); return scmNodeStorageReportMap.containsKey(datanodeID);
} }
public List<UUID> getDatanodeList( public List<UUID> getDatanodeList(
UtilizationThreshold threshold) { UtilizationThreshold threshold) {
return scmNodeStorageStatMap.entrySet().stream() return scmNodeStorageReportMap.entrySet().stream().filter(
.filter(entry -> (isThresholdReached(threshold, entry.getValue()))) entry -> (isThresholdReached(threshold,
getScmUsedratio(getUsedSpace(entry.getKey()),
getCapacity(entry.getKey())))))
.map(Map.Entry::getKey) .map(Map.Entry::getKey)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@ -105,19 +105,19 @@ public List<UUID> getDatanodeList(
* Insert a new datanode into Node2Container Map. * Insert a new datanode into Node2Container Map.
* *
* @param datanodeID -- Datanode UUID * @param datanodeID -- Datanode UUID
* @param stat - scmNode stat for the Datanode. * @param report - set if StorageReports.
*/ */
public void insertNewDatanode(UUID datanodeID, SCMNodeStat stat) public void insertNewDatanode(UUID datanodeID, Set<StorageLocationReport> report)
throws SCMException { throws SCMException {
Preconditions.checkNotNull(stat); Preconditions.checkNotNull(report);
Preconditions.checkState(report.size() != 0);
Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(datanodeID);
synchronized (scmNodeStorageStatMap) { synchronized (scmNodeStorageReportMap) {
if (isKnownDatanode(datanodeID)) { if (isKnownDatanode(datanodeID)) {
throw new SCMException("Node already exists in the map", throw new SCMException("Node already exists in the map",
DUPLICATE_DATANODE); DUPLICATE_DATANODE);
} }
scmNodeStorageStatMap.put(datanodeID, stat); scmNodeStorageReportMap.putIfAbsent(datanodeID, report);
clusterStat.add(stat);
} }
} }
@ -138,72 +138,103 @@ private void unregisterMXBean() {
* Updates the Container list of an existing DN. * Updates the Container list of an existing DN.
* *
* @param datanodeID - UUID of DN. * @param datanodeID - UUID of DN.
* @param stat - scmNode stat for the Datanode. * @param report - set of Storage Reports for the Datanode.
* @throws SCMException - if we don't know about this datanode, for new DN * @throws SCMException - if we don't know about this datanode, for new DN
* use insertNewDatanode. * use insertNewDatanode.
*/ */
public void updateDatanodeMap(UUID datanodeID, SCMNodeStat stat) public void updateDatanodeMap(UUID datanodeID, Set<StorageLocationReport> report)
throws SCMException { throws SCMException {
Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(datanodeID);
Preconditions.checkNotNull(stat); Preconditions.checkNotNull(report);
synchronized (scmNodeStorageStatMap) { Preconditions.checkState(report.size() != 0);
if (!scmNodeStorageStatMap.containsKey(datanodeID)) { synchronized (scmNodeStorageReportMap) {
if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
throw new SCMException("No such datanode", NO_SUCH_DATANODE); throw new SCMException("No such datanode", NO_SUCH_DATANODE);
} }
SCMNodeStat removed = scmNodeStorageStatMap.get(datanodeID); scmNodeStorageReportMap.put(datanodeID, report);
clusterStat.subtract(removed);
scmNodeStorageStatMap.put(datanodeID, stat);
clusterStat.add(stat);
} }
} }
public NodeReportStatus processNodeReport(UUID datanodeID, public StorageReportResult processNodeReport(UUID datanodeID,
StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport)
throws SCMException { throws IOException {
Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(datanodeID);
Preconditions.checkNotNull(nodeReport); Preconditions.checkNotNull(nodeReport);
long totalCapacity = 0; long totalCapacity = 0;
long totalRemaining = 0; long totalRemaining = 0;
long totalScmUsed = 0; long totalScmUsed = 0;
List<StorageContainerDatanodeProtocolProtos.SCMStorageReport> Set<StorageLocationReport> storagReportSet = new HashSet<>();
Set<StorageLocationReport> fullVolumeSet = new HashSet<>();
Set<StorageLocationReport> failedVolumeSet = new HashSet<>();
List<SCMStorageReport>
storageReports = nodeReport.getStorageReportList(); storageReports = nodeReport.getStorageReportList();
for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : storageReports) { for (SCMStorageReport report : storageReports) {
StorageLocationReport storageReport =
StorageLocationReport.getFromProtobuf(report);
storagReportSet.add(storageReport);
if (report.hasFailed() && report.getFailed()) {
failedVolumeSet.add(storageReport);
} else if (isThresholdReached(UtilizationThreshold.CRITICAL,
getScmUsedratio(report.getScmUsed(), report.getCapacity()))) {
fullVolumeSet.add(storageReport);
}
totalCapacity += report.getCapacity(); totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining(); totalRemaining += report.getRemaining();
totalScmUsed += report.getScmUsed(); totalScmUsed += report.getScmUsed();
} }
SCMNodeStat stat = scmNodeStorageStatMap.get(datanodeID);
if (stat == null) { if (!isKnownDatanode(datanodeID)) {
stat = new SCMNodeStat(); insertNewDatanode(datanodeID, storagReportSet);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
insertNewDatanode(datanodeID, stat);
} else { } else {
stat.set(totalCapacity, totalScmUsed, totalRemaining); updateDatanodeMap(datanodeID, storagReportSet);
updateDatanodeMap(datanodeID, stat);
} }
if (isThresholdReached(UtilizationThreshold.CRITICAL, stat)) { if (isThresholdReached(UtilizationThreshold.CRITICAL,
getScmUsedratio(totalScmUsed, totalCapacity))) {
LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}", LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}",
datanodeID, stat.getCapacity().get(), stat.getScmUsed().get()); datanodeID, totalCapacity, totalScmUsed);
return NodeReportStatus.DATANODE_OUT_OF_SPACE; return StorageReportResult.ReportResultBuilder.newBuilder()
} else { .setStatus(ReportStatus.DATANODE_OUT_OF_SPACE)
if (isThresholdReached(UtilizationThreshold.WARN, stat)) { .setFullVolumeSet(fullVolumeSet).setFailedVolumeSet(failedVolumeSet)
LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}", .build();
datanodeID, stat.getCapacity().get(), stat.getScmUsed().get());
}
return NodeReportStatus.ALL_IS_WELL;
} }
if (isThresholdReached(UtilizationThreshold.WARN,
getScmUsedratio(totalScmUsed, totalCapacity))) {
LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}",
datanodeID, totalCapacity, totalScmUsed);
}
if (failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
return StorageReportResult.ReportResultBuilder.newBuilder()
.setStatus(ReportStatus.STORAGE_OUT_OF_SPACE)
.setFullVolumeSet(fullVolumeSet).build();
}
if (!failedVolumeSet.isEmpty() && fullVolumeSet.isEmpty()) {
return StorageReportResult.ReportResultBuilder.newBuilder()
.setStatus(ReportStatus.FAILED_STORAGE)
.setFailedVolumeSet(failedVolumeSet).build();
}
if (!failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) {
return StorageReportResult.ReportResultBuilder.newBuilder()
.setStatus(ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE)
.setFailedVolumeSet(failedVolumeSet).setFullVolumeSet(fullVolumeSet)
.build();
}
return StorageReportResult.ReportResultBuilder.newBuilder()
.setStatus(ReportStatus.ALL_IS_WELL).build();
} }
private boolean isThresholdReached(UtilizationThreshold threshold, private boolean isThresholdReached(UtilizationThreshold threshold,
SCMNodeStat stat) { double scmUsedratio) {
switch (threshold) { switch (threshold) {
case NORMAL: case NORMAL:
return stat.getScmUsedratio() < warningUtilizationThreshold; return scmUsedratio < warningUtilizationThreshold;
case WARN: case WARN:
return stat.getScmUsedratio() >= warningUtilizationThreshold && return scmUsedratio >= warningUtilizationThreshold
stat.getScmUsedratio() < criticalUtilizationThreshold; && scmUsedratio < criticalUtilizationThreshold;
case CRITICAL: case CRITICAL:
return stat.getScmUsedratio() >= criticalUtilizationThreshold; return scmUsedratio >= criticalUtilizationThreshold;
default: default:
throw new RuntimeException("Unknown UtilizationThreshold value"); throw new RuntimeException("Unknown UtilizationThreshold value");
} }
@ -211,67 +242,120 @@ private boolean isThresholdReached(UtilizationThreshold threshold,
@Override @Override
public long getCapacity(UUID dnId) { public long getCapacity(UUID dnId) {
return scmNodeStorageStatMap.get(dnId).getCapacity().get(); long capacity = 0;
Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
for (StorageLocationReport report : reportSet) {
capacity += report.getCapacity();
}
return capacity;
} }
@Override @Override
public long getRemainingSpace(UUID dnId) { public long getRemainingSpace(UUID dnId) {
return scmNodeStorageStatMap.get(dnId).getRemaining().get(); long remaining = 0;
Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
for (StorageLocationReport report : reportSet) {
remaining += report.getRemaining();
}
return remaining;
} }
@Override @Override
public long getUsedSpace(UUID dnId) { public long getUsedSpace(UUID dnId) {
return scmNodeStorageStatMap.get(dnId).getScmUsed().get(); long scmUsed = 0;
Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId);
for (StorageLocationReport report : reportSet) {
scmUsed += report.getScmUsed();
}
return scmUsed;
} }
@Override @Override
public long getTotalCapacity() { public long getTotalCapacity() {
return clusterStat.getCapacity().get(); long capacity = 0;
Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
for (UUID id : dnIdSet) {
capacity += getCapacity(id);
}
return capacity;
} }
@Override @Override
public long getTotalSpaceUsed() { public long getTotalSpaceUsed() {
return clusterStat.getScmUsed().get(); long scmUsed = 0;
Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
for (UUID id : dnIdSet) {
scmUsed += getUsedSpace(id);
}
return scmUsed;
} }
@Override @Override
public long getTotalFreeSpace() { public long getTotalFreeSpace() {
return clusterStat.getRemaining().get(); long remaining = 0;
Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet();
for (UUID id : dnIdSet) {
remaining += getRemainingSpace(id);
}
return remaining;
} }
/** /**
* removes the dataNode from scmNodeStorageStatMap * removes the dataNode from scmNodeStorageReportMap
* @param datanodeID * @param datanodeID
* @throws SCMException in case the dataNode is not found in the map. * @throws SCMException in case the dataNode is not found in the map.
*/ */
public void removeDatanode(UUID datanodeID) throws SCMException { public void removeDatanode(UUID datanodeID) throws SCMException {
Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(datanodeID);
synchronized (scmNodeStorageStatMap) { synchronized (scmNodeStorageReportMap) {
if (!scmNodeStorageStatMap.containsKey(datanodeID)) { if (!scmNodeStorageReportMap.containsKey(datanodeID)) {
throw new SCMException("No such datanode", NO_SUCH_DATANODE); throw new SCMException("No such datanode", NO_SUCH_DATANODE);
} }
SCMNodeStat stat = scmNodeStorageStatMap.remove(datanodeID); scmNodeStorageReportMap.remove(datanodeID);
clusterStat.subtract(stat);
} }
} }
/** /**
* Gets the SCMNodeStat for the datanode * Returns the set of storage volumes for a Datanode.
* @param datanodeID * @param datanodeID
* @return SCMNodeStat * @return set of storage volumes.
*/ */
SCMNodeStat getNodeStat(UUID datanodeID) { @Override
return scmNodeStorageStatMap.get(datanodeID); public Set<StorageLocationReport> getStorageVolumes(UUID datanodeID) {
return scmNodeStorageReportMap.get(datanodeID);
} }
/**
* Truncate to 4 digits since uncontrolled precision is some times
* counter intuitive to what users expect.
* @param value - double.
* @return double.
*/
private double truncateDecimals(double value) {
final int multiplier = 10000;
return (double) ((long) (value * multiplier)) / multiplier;
}
/**
* get the scmUsed ratio
*/
public double getScmUsedratio(long scmUsed, long capacity) {
double scmUsedRatio =
truncateDecimals (scmUsed / (double) capacity);
return scmUsedRatio;
}
/** /**
* Results possible from processing a Node report by * Results possible from processing a Node report by
* Node2ContainerMapper. * Node2ContainerMapper.
*/ */
public enum NodeReportStatus { public enum ReportStatus {
ALL_IS_WELL, ALL_IS_WELL,
DATANODE_OUT_OF_SPACE DATANODE_OUT_OF_SPACE,
STORAGE_OUT_OF_SPACE,
FAILED_STORAGE,
FAILED_AND_OUT_OF_SPACE_STORAGE
} }
} }

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import java.util.Set;
/**
* A Container Report gets processsed by the Node2Container and returns the
* Report Result class.
*/
public class StorageReportResult {
private SCMNodeStorageStatMap.ReportStatus status;
private Set<StorageLocationReport> fullVolumes;
private Set<StorageLocationReport> failedVolumes;
StorageReportResult(SCMNodeStorageStatMap.ReportStatus status,
Set<StorageLocationReport> fullVolumes,
Set<StorageLocationReport> failedVolumes) {
this.status = status;
this.fullVolumes = fullVolumes;
this.failedVolumes = failedVolumes;
}
public SCMNodeStorageStatMap.ReportStatus getStatus() {
return status;
}
public Set<StorageLocationReport> getFullVolumes() {
return fullVolumes;
}
public Set<StorageLocationReport> getFailedVolumes() {
return failedVolumes;
}
static class ReportResultBuilder {
private SCMNodeStorageStatMap.ReportStatus status;
private Set<StorageLocationReport> fullVolumes;
private Set<StorageLocationReport> failedVolumes;
static ReportResultBuilder newBuilder() {
return new ReportResultBuilder();
}
public ReportResultBuilder setStatus(
SCMNodeStorageStatMap.ReportStatus newstatus) {
this.status = newstatus;
return this;
}
public ReportResultBuilder setFullVolumeSet(
Set<StorageLocationReport> fullVolumes) {
this.fullVolumes = fullVolumes;
return this;
}
public ReportResultBuilder setFailedVolumeSet(
Set<StorageLocationReport> failedVolumes) {
this.failedVolumes = failedVolumes;
return this;
}
StorageReportResult build() {
return new StorageReportResult(status, fullVolumes, failedVolumes);
}
}
}

View File

@ -17,38 +17,56 @@
*/ */
package org.apache.hadoop.hdds.scm.node; package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.*; import org.junit.*;
import org.junit.Rule;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.Set;
import java.util.ArrayList;
import java.util.HashSet;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TestSCMNodeStorageStatMap { public class TestSCMNodeStorageStatMap {
private final static int DATANODE_COUNT = 300; private final static int DATANODE_COUNT = 100;
final long capacity = 10L * OzoneConsts.GB; final long capacity = 10L * OzoneConsts.GB;
final long used = 2L * OzoneConsts.GB; final long used = 2L * OzoneConsts.GB;
final long remaining = capacity - used; final long remaining = capacity - used;
private static OzoneConfiguration conf = new OzoneConfiguration(); private static OzoneConfiguration conf = new OzoneConfiguration();
private final Map<UUID, SCMNodeStat> testData = new ConcurrentHashMap<>(); private final Map<UUID, Set<StorageLocationReport>> testData =
new ConcurrentHashMap<>();
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
private void generateData() { private void generateData() {
SCMNodeStat stat = new SCMNodeStat();
stat.set(capacity, used, remaining);
for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) { for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) {
testData.put(UUID.randomUUID(), stat); UUID dnId = UUID.randomUUID();
Set<StorageLocationReport> reportSet = new HashSet<>();
String path = GenericTestUtils.getTempPath(
TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + Integer
.toString(dnIndex));
StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
builder.setStorageType(StorageType.DISK).setId(dnId.toString())
.setStorageLocation(path).setScmUsed(used).setRemaining(remaining)
.setCapacity(capacity).setFailed(false);
reportSet.add(builder.build());
testData.put(UUID.randomUUID(), reportSet);
} }
} }
@ -70,8 +88,8 @@ public void testIsKnownDatanode() throws SCMException {
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
UUID knownNode = getFirstKey(); UUID knownNode = getFirstKey();
UUID unknownNode = UUID.randomUUID(); UUID unknownNode = UUID.randomUUID();
SCMNodeStat stat = testData.get(knownNode); Set<StorageLocationReport> report = testData.get(knownNode);
map.insertNewDatanode(knownNode, stat); map.insertNewDatanode(knownNode, report);
Assert.assertTrue("Not able to detect a known node", Assert.assertTrue("Not able to detect a known node",
map.isKnownDatanode(knownNode)); map.isKnownDatanode(knownNode));
Assert.assertFalse("Unknown node detected", Assert.assertFalse("Unknown node detected",
@ -82,54 +100,89 @@ public void testIsKnownDatanode() throws SCMException {
public void testInsertNewDatanode() throws SCMException { public void testInsertNewDatanode() throws SCMException {
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
UUID knownNode = getFirstKey(); UUID knownNode = getFirstKey();
SCMNodeStat stat = testData.get(knownNode); Set<StorageLocationReport> report = testData.get(knownNode);
map.insertNewDatanode(knownNode, stat); map.insertNewDatanode(knownNode, report);
Assert.assertEquals(map.getNodeStat(knownNode).getScmUsed(), Assert.assertEquals(map.getStorageVolumes(knownNode),
testData.get(knownNode).getScmUsed()); testData.get(knownNode));
thrown.expect(SCMException.class); thrown.expect(SCMException.class);
thrown.expectMessage("already exists"); thrown.expectMessage("already exists");
map.insertNewDatanode(knownNode, stat); map.insertNewDatanode(knownNode, report);
} }
@Test @Test
public void testUpdateUnknownDatanode() throws SCMException { public void testUpdateUnknownDatanode() throws SCMException {
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
UUID unknownNode = UUID.randomUUID(); UUID unknownNode = UUID.randomUUID();
SCMNodeStat stat = new SCMNodeStat(); String path = GenericTestUtils.getTempPath(
TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + unknownNode
.toString());
Set<StorageLocationReport> reportSet = new HashSet<>();
StorageLocationReport.Builder builder = StorageLocationReport.newBuilder();
builder.setStorageType(StorageType.DISK).setId(unknownNode.toString())
.setStorageLocation(path).setScmUsed(used).setRemaining(remaining)
.setCapacity(capacity).setFailed(false);
reportSet.add(builder.build());
thrown.expect(SCMException.class); thrown.expect(SCMException.class);
thrown.expectMessage("No such datanode"); thrown.expectMessage("No such datanode");
map.updateDatanodeMap(unknownNode, stat); map.updateDatanodeMap(unknownNode, reportSet);
} }
@Test @Test
public void testProcessNodeReportCheckOneNode() throws SCMException { public void testProcessNodeReportCheckOneNode() throws IOException {
UUID key = getFirstKey(); UUID key = getFirstKey();
SCMNodeStat value = testData.get(key); List<SCMStorageReport> reportList = new ArrayList<>();
Set<StorageLocationReport> reportSet = testData.get(key);
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
map.insertNewDatanode(key, value); map.insertNewDatanode(key, reportSet);
Assert.assertTrue(map.isKnownDatanode(key)); Assert.assertTrue(map.isKnownDatanode(key));
String storageId = UUID.randomUUID().toString(); String storageId = UUID.randomUUID().toString();
String path = String path =
GenericTestUtils.getRandomizedTempPath().concat("/" + storageId); GenericTestUtils.getRandomizedTempPath().concat("/" + storageId);
long capacity = value.getCapacity().get(); StorageLocationReport report = reportSet.iterator().next();
long used = value.getScmUsed().get(); long capacity = report.getCapacity();
long remaining = value.getRemaining().get(); long used = report.getScmUsed();
long remaining = report.getRemaining();
List<SCMStorageReport> reports = TestUtils List<SCMStorageReport> reports = TestUtils
.createStorageReport(capacity, used, remaining, path, null, storageId, .createStorageReport(capacity, used, remaining, path, null, storageId,
1); 1);
SCMNodeStorageStatMap.NodeReportStatus status = StorageReportResult result =
map.processNodeReport(key, TestUtils.createNodeReport(reports)); map.processNodeReport(key, TestUtils.createNodeReport(reports));
Assert.assertEquals(status, Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.NodeReportStatus.ALL_IS_WELL); SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
SCMNodeReport.newBuilder();
SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage();
reportList.add(srb);
result = map.processNodeReport(key, TestUtils.createNodeReport(reportList));
Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL);
reportList.add(TestUtils
.createStorageReport(capacity, capacity, 0, path, null,
UUID.randomUUID().toString(), 1).get(0));
result = map.processNodeReport(key, TestUtils.createNodeReport(reportList));
Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE);
// Mark a disk failed
SCMStorageReport srb2 = SCMStorageReport.newBuilder()
.setStorageUuid(UUID.randomUUID().toString())
.setStorageLocation(srb.getStorageLocation()).setScmUsed(capacity)
.setCapacity(capacity).setRemaining(0).setFailed(true).build();
reportList.add(srb2);
nrb.addAllStorageReport(reportList);
result = map.processNodeReport(key, nrb.addStorageReport(srb).build());
Assert.assertEquals(result.getStatus(),
SCMNodeStorageStatMap.ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE);
} }
@Test @Test
public void testProcessNodeReportAndSCMStats() throws SCMException { public void testProcessMultipleNodeReports() throws SCMException {
SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf);
int counter = 1; int counter = 1;
// Insert all testData into the SCMNodeStorageStatMap Map. // Insert all testData into the SCMNodeStorageStatMap Map.
for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) { for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
.entrySet()) {
map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue()); map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue());
} }
Assert.assertEquals(DATANODE_COUNT * capacity, map.getTotalCapacity()); Assert.assertEquals(DATANODE_COUNT * capacity, map.getTotalCapacity());
@ -137,9 +190,21 @@ public void testProcessNodeReportAndSCMStats() throws SCMException {
Assert.assertEquals(DATANODE_COUNT * used, map.getTotalSpaceUsed()); Assert.assertEquals(DATANODE_COUNT * used, map.getTotalSpaceUsed());
// upadate 1/4th of the datanode to be full // upadate 1/4th of the datanode to be full
for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) { for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
SCMNodeStat stat = new SCMNodeStat(capacity, capacity, 0); .entrySet()) {
map.updateDatanodeMap(keyEntry.getKey(), stat); Set<StorageLocationReport> reportSet = new HashSet<>();
String path = GenericTestUtils.getTempPath(
TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + keyEntry
.getKey().toString());
StorageLocationReport.Builder builder =
StorageLocationReport.newBuilder();
builder.setStorageType(StorageType.DISK)
.setId(keyEntry.getKey().toString()).setStorageLocation(path)
.setScmUsed(capacity).setRemaining(0).setCapacity(capacity)
.setFailed(false);
reportSet.add(builder.build());
map.updateDatanodeMap(keyEntry.getKey(), reportSet);
counter++; counter++;
if (counter > DATANODE_COUNT / 4) { if (counter > DATANODE_COUNT / 4) {
break; break;
@ -163,7 +228,8 @@ public void testProcessNodeReportAndSCMStats() throws SCMException {
map.getTotalSpaceUsed(), 0); map.getTotalSpaceUsed(), 0);
counter = 1; counter = 1;
// Remove 1/4 of the DataNodes from the Map // Remove 1/4 of the DataNodes from the Map
for (Map.Entry<UUID, SCMNodeStat> keyEntry : testData.entrySet()) { for (Map.Entry<UUID, Set<StorageLocationReport>> keyEntry : testData
.entrySet()) {
map.removeDatanode(keyEntry.getKey()); map.removeDatanode(keyEntry.getKey());
counter++; counter++;
if (counter > DATANODE_COUNT / 4) { if (counter > DATANODE_COUNT / 4) {
@ -181,12 +247,13 @@ public void testProcessNodeReportAndSCMStats() throws SCMException {
map.getDatanodeList(SCMNodeStorageStatMap.UtilizationThreshold.NORMAL) map.getDatanodeList(SCMNodeStorageStatMap.UtilizationThreshold.NORMAL)
.size(), 0); .size(), 0);
Assert.assertEquals(0.75 * DATANODE_COUNT * capacity, map.getTotalCapacity(), 0); Assert
.assertEquals(0.75 * DATANODE_COUNT * capacity, map.getTotalCapacity(),
0);
Assert.assertEquals(0.75 * DATANODE_COUNT * remaining, Assert.assertEquals(0.75 * DATANODE_COUNT * remaining,
map.getTotalFreeSpace(), 0); map.getTotalFreeSpace(), 0);
Assert.assertEquals( Assert
0.75 * DATANODE_COUNT * used , .assertEquals(0.75 * DATANODE_COUNT * used, map.getTotalSpaceUsed(), 0);
map.getTotalSpaceUsed(), 0);
} }
} }