HDFS-12411. Ozone: Add container usage information to DN container report. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-10-13 10:51:29 -07:00 committed by Owen O'Malley
parent fb545e4291
commit a467017068
41 changed files with 945 additions and 223 deletions

View File

@ -156,6 +156,11 @@ public final class OzoneConfigKeys {
public static final int
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
public static final String OZONE_CONTAINER_REPORT_INTERVAL_MS =
"ozone.container.report.interval.ms";
public static final int OZONE_CONTAINER_REPORT_INTERVAL_MS_DEFAULT =
60000;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -195,8 +195,6 @@ message ContainerCommandResponseProto {
}
message ContainerData {
required string name = 1;
repeated KeyValue metadata = 2;
@ -204,6 +202,8 @@ message ContainerData {
optional string containerPath = 4;
optional bool open = 5 [default = true];
optional string hash = 6;
optional int64 bytesUsed = 7;
optional int64 size = 8;
}
message ContainerMeta {

View File

@ -19,14 +19,18 @@
package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class maintains the information about a container in the ozone world.
@ -43,16 +47,22 @@ public class ContainerData {
private String containerFilePath;
private boolean open;
private String hash;
private AtomicLong bytesUsed;
private long maxSize;
/**
* Constructs a ContainerData Object.
*
* @param containerName - Name
*/
public ContainerData(String containerName) {
public ContainerData(String containerName, Configuration conf) {
this.metadata = new TreeMap<>();
this.containerName = containerName;
this.open = true;
this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
this.bytesUsed = new AtomicLong(0L);
}
/**
@ -62,8 +72,9 @@ public ContainerData(String containerName) {
* @throws IOException
*/
public static ContainerData getFromProtBuf(
ContainerProtos.ContainerData protoData) throws IOException {
ContainerData data = new ContainerData(protoData.getName());
ContainerProtos.ContainerData protoData, Configuration conf)
throws IOException {
ContainerData data = new ContainerData(protoData.getName(), conf);
for (int x = 0; x < protoData.getMetadataCount(); x++) {
data.addMetadata(protoData.getMetadata(x).getKey(),
protoData.getMetadata(x).getValue());
@ -86,6 +97,14 @@ public static ContainerData getFromProtBuf(
if(protoData.hasHash()) {
data.setHash(protoData.getHash());
}
if (protoData.hasBytesUsed()) {
data.setBytesUsed(protoData.getBytesUsed());
}
if (protoData.hasSize()) {
data.setMaxSize(protoData.getSize());
}
return data;
}
@ -120,6 +139,13 @@ public ContainerProtos.ContainerData getProtoBufMessage() {
.setValue(entry.getValue()).build());
}
if (this.getBytesUsed() >= 0) {
builder.setBytesUsed(this.getBytesUsed());
}
if (this.getMaxSize() >= 0) {
builder.setSize(this.getMaxSize());
}
return builder.build();
}
@ -251,8 +277,6 @@ public void setHash(String hash) {
this.hash = hash;
}
/**
* Sets the open or closed values.
* @param open
@ -261,4 +285,27 @@ public synchronized void setOpen(boolean open) {
this.open = open;
}
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
}
public long getMaxSize() {
return maxSize;
}
public long getKeyCount() {
return metadata.size();
}
public void setBytesUsed(long used) {
this.bytesUsed.set(used);
}
public long addBytesUsed(long delta) {
return this.bytesUsed.addAndGet(delta);
}
public long getBytesUsed() {
return bytesUsed.get();
}
}

View File

@ -23,14 +23,6 @@
/**
* Container Report iterates the closed containers and sends a container report
* to SCM.
* <p>
* The protobuf counter part of this class looks like this.
* message ContainerInfo {
* required string containerName = 1;
* required string finalhash = 2;
* optional int64 size = 3;
* optional int64 keycount = 4;
* }
*/
public class ContainerReport {
private static final int UNKNOWN = -1;
@ -38,6 +30,12 @@ public class ContainerReport {
private final String finalhash;
private long size;
private long keyCount;
private long bytesUsed;
private long readCount;
private long writeCount;
private long readBytes;
private long writeBytes;
/**
* Constructs the ContainerReport.
@ -50,6 +48,11 @@ public ContainerReport(String containerName, String finalhash) {
this.finalhash = finalhash;
this.size = UNKNOWN;
this.keyCount = UNKNOWN;
this.bytesUsed = 0L;
this.readCount = 0L;
this.readBytes = 0L;
this.writeCount = 0L;
this.writeBytes = 0L;
}
/**
@ -65,9 +68,25 @@ public static ContainerReport getFromProtoBuf(ContainerInfo info) {
if (info.hasSize()) {
report.setSize(info.getSize());
}
if (info.hasKeycount()) {
report.setKeyCount(info.getKeycount());
if (info.hasKeyCount()) {
report.setKeyCount(info.getKeyCount());
}
if (info.hasUsed()) {
report.setBytesUsed(info.getUsed());
}
if (info.hasReadCount()) {
report.setReadCount(info.getReadCount());
}
if (info.hasReadBytes()) {
report.setReadBytes(info.getReadBytes());
}
if (info.hasWriteCount()) {
report.setWriteCount(info.getWriteCount());
}
if (info.hasWriteBytes()) {
report.setWriteBytes(info.getWriteBytes());
}
return report;
}
@ -125,6 +144,46 @@ public void setKeyCount(long keyCount) {
this.keyCount = keyCount;
}
public long getReadCount() {
return readCount;
}
public void setReadCount(long readCount) {
this.readCount = readCount;
}
public long getWriteCount() {
return writeCount;
}
public void setWriteCount(long writeCount) {
this.writeCount = writeCount;
}
public long getReadBytes() {
return readBytes;
}
public void setReadBytes(long readBytes) {
this.readBytes = readBytes;
}
public long getWriteBytes() {
return writeBytes;
}
public void setWriteBytes(long writeBytes) {
this.writeBytes = writeBytes;
}
public long getBytesUsed() {
return bytesUsed;
}
public void setBytesUsed(long bytesUsed) {
this.bytesUsed = bytesUsed;
}
/**
* Gets a containerInfo protobuf message from ContainerReports.
*
@ -133,8 +192,13 @@ public void setKeyCount(long keyCount) {
public ContainerInfo getProtoBufMessage() {
return ContainerInfo.newBuilder()
.setContainerName(this.getContainerName())
.setKeycount(this.getKeyCount())
.setKeyCount(this.getKeyCount())
.setSize(this.getSize())
.setUsed(this.getBytesUsed())
.setReadCount(this.getReadCount())
.setReadBytes(this.getReadBytes())
.setWriteCount(this.getWriteCount())
.setWriteBytes(this.getWriteBytes())
.setFinalhash(this.getFinalhash())
.build();
}

View File

@ -158,4 +158,13 @@ public String getKeyName() {
public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
this.chunks = chunks;
}
/**
* Get the total size of chunks allocated for the key.
* @return total size of the key.
*/
public long getSize() {
return chunks.parallelStream().mapToLong(e->e.getLen()).sum();
}
}

View File

@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
@ -29,6 +30,7 @@
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
@ -67,19 +69,24 @@ public ChunkManagerImpl(ContainerManager manager) {
public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
byte[] data)
throws StorageContainerException {
// we don't want container manager to go away while we are writing chunks.
containerManager.readLock();
// TODO : Take keyManager Write lock here.
try {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
Preconditions.checkNotNull(pipeline.getContainerName(),
String containerName = pipeline.getContainerName();
Preconditions.checkNotNull(containerName,
"Container name cannot be null");
File chunkFile = ChunkUtils.validateChunk(pipeline,
containerManager.readContainer(pipeline.getContainerName()), info);
ContainerData container =
containerManager.readContainer(containerName);
File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
long oldSize = chunkFile.length();
ChunkUtils.writeData(chunkFile, info, data);
containerManager.incrWriteBytes(containerName, info.getLen());
containerManager.incrWriteCount(containerName);
long newSize = chunkFile.length();
containerManager.incrBytesUsed(containerName, newSize - oldSize);
} catch (ExecutionException | NoSuchAlgorithmException e) {
LOG.error("write data failed. error: {}", e);
throw new StorageContainerException("Internal error: ", e,
@ -110,9 +117,17 @@ public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
throws StorageContainerException {
containerManager.readLock();
try {
File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
.readContainer(pipeline.getContainerName()), info);
return ChunkUtils.readData(chunkFile, info).array();
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
String containerName = pipeline.getContainerName();
Preconditions.checkNotNull(containerName,
"Container name cannot be null");
ContainerData container =
containerManager.readContainer(containerName);
File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info);
ByteBuffer data = ChunkUtils.readData(chunkFile, info);
containerManager.incrReadCount(containerName);
containerManager.incrReadBytes(containerName, chunkFile.length());
return data.array();
} catch (ExecutionException | NoSuchAlgorithmException e) {
LOG.error("read data failed. error: {}", e);
throw new StorageContainerException("Internal error: ",
@ -138,13 +153,17 @@ public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
@Override
public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
throws StorageContainerException {
containerManager.readLock();
try {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
String containerName = pipeline.getContainerName();
Preconditions.checkNotNull(containerName,
"Container name cannot be null");
File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
.readContainer(pipeline.getContainerName()), info);
.readContainer(containerName), info);
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
FileUtil.fullyDelete(chunkFile);
containerManager.decrBytesUsed(containerName, chunkFile.length());
} else {
LOG.error("Not Supported Operation. Trying to delete a " +
"chunk that is in shared file. chunk info : " + info.toString());

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerReportManager;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.util.Time;
import java.util.concurrent.atomic.AtomicLong;
/**
* Class wraps the container report operations on datanode.
* // TODO: support incremental/delta container report
*/
public class ContainerReportManagerImpl implements ContainerReportManager {
private Configuration config;
// Last non-empty container report time
private long lastContainerReportTime;
private final long containerReportInterval;
private AtomicLong reportCount;
private static final ReportState NO_CONTAINER_REPORTSTATE =
ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
public ContainerReportManagerImpl(Configuration config) {
this.config = config;
this.lastContainerReportTime = -1;
this.reportCount = new AtomicLong(0L);
this.containerReportInterval = config.getLong(
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_MS,
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_MS_DEFAULT);
}
public ReportState getContainerReportState() {
if (lastContainerReportTime < 0) {
return getFullContainerReportState();
} else {
// Add a random delay (0~30s) on top of the container report
// interval (60s) so tha the SCM is overwhelmed by the container reports
// sent in sync.
if (Time.monotonicNow() - lastContainerReportTime >
(containerReportInterval + getRandomReportDelay())) {
return getFullContainerReportState();
} else {
return getNoContainerReportState();
}
}
}
private ReportState getFullContainerReportState() {
ReportState.Builder rsBuilder = ReportState.newBuilder();
rsBuilder.setState(ReportState.states.completeContinerReport);
rsBuilder.setCount(reportCount.incrementAndGet());
this.lastContainerReportTime = Time.monotonicNow();
return rsBuilder.build();
}
private ReportState getNoContainerReportState() {
return NO_CONTAINER_REPORTSTATE;
}
private long getRandomReportDelay() {
return RandomUtils.nextLong(0,
OzoneClientUtils.getScmHeartbeatInterval(config));
}
}

View File

@ -19,8 +19,10 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import java.util.concurrent.atomic.AtomicLong;
/**
* This is an immutable class that represents the state of a container. if the
* This class represents the state of a container. if the
* container reading encountered an error when we boot up we will post that
* info to a recovery queue and keep the info in the containerMap.
* <p/>
@ -36,6 +38,14 @@ public class ContainerStatus {
*/
private int numPendingDeletionBlocks;
private AtomicLong readBytes;
private AtomicLong writeBytes;
private AtomicLong readCount;
private AtomicLong writeCount;
/**
* Creates a Container Status class.
*
@ -44,6 +54,10 @@ public class ContainerStatus {
ContainerStatus(ContainerData containerData) {
this.numPendingDeletionBlocks = 0;
this.containerData = containerData;
this.readCount = new AtomicLong(0L);
this.readBytes = new AtomicLong(0L);
this.writeCount = new AtomicLong(0L);
this.writeBytes = new AtomicLong(0L);
}
/**
@ -80,4 +94,124 @@ public void decrPendingDeletionBlocks(int numBlocks) {
public int getNumPendingDeletionBlocks() {
return this.numPendingDeletionBlocks;
}
/**
* Get the number of bytes read from the container.
* @return the number of bytes read from the container.
*/
public long getReadBytes() {
return readBytes.get();
}
/**
* Increase the number of bytes read from the container.
* @param bytes number of bytes read.
*/
public void incrReadBytes(long bytes) {
this.readBytes.addAndGet(bytes);
}
/**
* Get the number of times the container is read.
* @return the number of times the container is read.
*/
public long getReadCount() {
return readCount.get();
}
/**
* Increase the number of container read count by 1.
*/
public void incrReadCount() {
this.readCount.incrementAndGet();
}
/**
* Get the number of bytes write into the container.
* @return the number of bytes write into the container.
*/
public long getWriteBytes() {
return writeBytes.get();
}
/**
* Increase the number of bytes write into the container.
* @param bytes the number of bytes write into the container.
*/
public void incrWriteBytes(long bytes) {
this.writeBytes.addAndGet(bytes);
}
/**
* Get the number of writes into the container.
* @return the number of writes into the container.
*/
public long getWriteCount() {
return writeCount.get();
}
/**
* Increase the number of writes into the container by 1.
*/
public void incrWriteCount() {
this.writeCount.incrementAndGet();
}
/**
* Get the number of bytes used by the container.
* @return the number of bytes used by the container.
*/
public long getBytesUsed() {
return containerData.getBytesUsed();
}
/**
* Increase the number of bytes used by the container.
* @param used number of bytes used by the container.
* @return the current number of bytes used by the container afert increase.
*/
public long incrBytesUsed(long used) {
return containerData.addBytesUsed(used);
}
/**
* Set the number of bytes used by the container.
* @param used the number of bytes used by the container.
*/
public void setBytesUsed(long used) {
containerData.setBytesUsed(used);
}
/**
* Decrease the number of bytes used by the container.
* @param reclaimed the number of bytes reclaimed from the container.
* @return the current number of bytes used by the container after decrease.
*/
public long decrBytesUsed(long reclaimed) {
return this.containerData.addBytesUsed(-1L * reclaimed);
}
/**
* Get the maximum container size.
* @return the maximum container size.
*/
public long getMaxSize() {
return containerData.getMaxSize();
}
/**
* Set the maximum container size.
* @param size the maximum container size.
*/
public void setMaxSize(long size) {
this.containerData.setMaxSize(size);
}
/**
* Get the number of keys in the container.
* @return the number of keys in the container.
*/
public long getNumKeys() {
return containerData.getKeyCount();
}
}

View File

@ -320,7 +320,7 @@ private ContainerCommandResponseProto handleUpdateContainer(
.getContainerData().getName();
ContainerData data = ContainerData.getFromProtBuf(
msg.getUpdateContainer().getContainerData());
msg.getUpdateContainer().getContainerData(), conf);
boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
this.containerManager.updateContainer(
pipeline, containerName, data, forceUpdate);
@ -389,7 +389,7 @@ private ContainerCommandResponseProto handleCreateContainer(
return ContainerUtils.malformedRequest(msg);
}
ContainerData cData = ContainerData.getFromProtBuf(
msg.getCreateContainer().getContainerData());
msg.getCreateContainer().getContainerData(), conf);
Preconditions.checkNotNull(cData, "Container data is null");
Pipeline pipeline = Pipeline.getFromProtoBuf(

View File

@ -78,10 +78,10 @@ public void putKey(Pipeline pipeline, KeyData data) throws IOException {
// We are not locking the key manager since LevelDb serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
Preconditions.checkNotNull(pipeline.getContainerName(),
String containerName = pipeline.getContainerName();
Preconditions.checkNotNull(containerName,
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(
pipeline.getContainerName());
ContainerData cData = containerManager.readContainer(containerName);
MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
@ -92,7 +92,6 @@ public void putKey(Pipeline pipeline, KeyData data) throws IOException {
} finally {
containerManager.readUnlock();
}
}
/**
@ -135,10 +134,10 @@ public void deleteKey(Pipeline pipeline, String keyName)
containerManager.readLock();
try {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
Preconditions.checkNotNull(pipeline.getContainerName(),
String containerName = pipeline.getContainerName();
Preconditions.checkNotNull(containerName,
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
ContainerData cData = containerManager.readContainer(containerName);
MetadataStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.

View File

@ -21,8 +21,11 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
@ -44,10 +47,11 @@ public interface ContainerManager extends RwLock {
*
* @param config - Configuration.
* @param containerDirs - List of Metadata Container locations.
* @param datanodeID - Datanode ID
* @throws StorageContainerException
*/
void init(Configuration config, List<StorageLocation> containerDirs)
throws IOException;
void init(Configuration config, List<StorageLocation> containerDirs,
DatanodeID datanodeID) throws IOException;
/**
* Creates a container with the given name.
@ -173,6 +177,13 @@ void closeContainer(String containerName)
*/
SCMNodeReport getNodeReport() throws IOException;
/**
* Gets container report.
* @return container report.
* @throws IOException
*/
ContainerReportsRequestProto getContainerReport() throws IOException;
/**
* Gets container reports.
* @return List of all closed containers.
@ -199,4 +210,67 @@ void closeContainer(String containerName)
* container id
*/
void decrPendingDeletionBlocks(int numBlocks, String containerId);
/**
* Increase the read count of the container.
* @param containerName - Name of the container.
*/
void incrReadCount(String containerName);
/**
* Increse the read counter for bytes read from the container.
* @param containerName - Name of the container.
* @param readBytes - bytes read from the container.
*/
void incrReadBytes(String containerName, long readBytes);
/**
* Increase the write count of the container.
* @param containerName - Name of the container.
*/
void incrWriteCount(String containerName);
/**
* Increase the write counter for bytes write into the container.
* @param containerName - Name of the container.
* @param writeBytes - bytes write into the container.
*/
void incrWriteBytes(String containerName, long writeBytes);
/**
* Increase the bytes used by the container.
* @param containerName - Name of the container.
* @param used - additional bytes used by the container.
* @return the current bytes used.
*/
long incrBytesUsed(String containerName, long used);
/**
* Decrease the bytes used by the container.
* @param containerName - Name of the container.
* @param used - additional bytes reclaimed by the container.
* @return the current bytes used.
*/
long decrBytesUsed(String containerName, long used);
/**
* Get the bytes used by the container.
* @param containerName - Name of the container.
* @return the current bytes used by the container.
*/
long getBytesUsed(String containerName);
/**
* Get the number of keys in the container.
* @param containerName - Name of the container.
* @return the current key count.
*/
long getNumKeys(String containerName);
/**
* Get the container report state to send via HB to SCM.
* @return container report state.
*/
ReportState getContainerReportState();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.interfaces;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
/**
* Interface for container report manager operations.
*/
public interface ContainerReportManager {
/**
* Get the container report state.
* @return the container report state.
*/
ReportState getContainerReportState();
}

View File

@ -130,6 +130,7 @@ private void start() throws IOException {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB.set(Time.monotonicNow() + heartbeatFrequency);
context.setReportState(container.getNodeReport());
context.setContainerReportState(container.getContainerReportState());
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
now = Time.monotonicNow();

View File

@ -16,21 +16,18 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* Container Report handler.
@ -62,22 +59,13 @@ public void handle(SCMCommand command, OzoneContainer container,
invocationCount++;
long startTime = Time.monotonicNow();
try {
ContainerReportsProto.Builder contianerReportsBuilder =
ContainerReportsProto.newBuilder();
List<ContainerData> closedContainerList = container.getContainerReports();
for (ContainerData cd : closedContainerList) {
ContainerReport report =
new ContainerReport(cd.getContainerName(), cd.getHash());
contianerReportsBuilder.addReports(report.getProtoBufMessage());
}
contianerReportsBuilder.setType(ContainerReportsProto.reportType
.fullReport);
ContainerReportsRequestProto contianerReport =
container.getContainerReport();
// TODO : We send this report to all SCMs.Check if it is enough only to
// send to the leader once we have RAFT enabled SCMs.
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
endPoint.getEndPoint().sendContainerReport(
contianerReportsBuilder.build());
endPoint.getEndPoint().sendContainerReport(contianerReport);
}
} catch (IOException ex) {
LOG.error("Unable to process the Container Report command.", ex);

View File

@ -36,8 +36,9 @@
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -96,7 +97,7 @@ public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws
}
manager = new ContainerManagerImpl();
manager.init(this.ozoneConfig, locations);
manager.init(this.ozoneConfig, locations, datanodeID);
this.chunkManager = new ChunkManagerImpl(manager);
manager.setChunkManager(this.chunkManager);
@ -234,6 +235,16 @@ public int getRatisContainerServerPort() {
return getPortbyType(OzoneProtos.ReplicationType.RATIS);
}
/**
* Returns container report.
* @return - container report.
* @throws IOException
*/
public ContainerReportsRequestProto getContainerReport() throws IOException {
return this.manager.getContainerReport();
}
// TODO: remove getContainerReports
/**
* Returns the list of closed containers.
* @return - List of closed containers.
@ -247,4 +258,12 @@ public List<ContainerData> getContainerReports() throws IOException {
public ContainerManager getContainerManager() {
return this.manager;
}
/**
* Get the container report state to send via HB to SCM.
* @return the container report state.
*/
public ReportState getContainerReportState() {
return this.manager.getContainerReportState();
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
@ -459,10 +460,9 @@ private VolumeList getAllVolumes() throws IOException {
public List<BlockGroup> getPendingDeletionKeys(final int count)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
final MetadataKeyFilter deletingKeyFilter =
new KeyPrefixFilter(DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getSequentialRangeKVs(null, count, deletingKeyFilter);
store.getRangeKVs(null, count,
MetadataKeyFilters.getDeletingKeyFilter());
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
KsmKeyInfo info =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));

View File

@ -18,7 +18,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@ -65,12 +66,12 @@ SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
/**
* Send a container report.
* @param reports -- Container report
* @return HeartbeatRespose.nullcommand.
* @param reports -- Container report.
* @return container reports response.
* @throws IOException
*/
SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
throws IOException;
ContainerReportsResponseProto sendContainerReport(
ContainerReportsRequestProto reports) throws IOException;
/**
* Used by datanode to send block deletion ACK to SCM.

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
@ -56,9 +57,10 @@ public interface StorageContainerNodeProtocol {
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @param reportState - container report.
* @return SCMheartbeat response list
*/
List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport);
SCMNodeReport nodeReport, ReportState reportState);
}

View File

@ -23,7 +23,8 @@
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
@ -168,9 +169,9 @@ public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendContainerReport(
ContainerReportsProto reports) throws IOException {
final SCMHeartbeatResponseProto resp;
public ContainerReportsResponseProto sendContainerReport(
ContainerReportsRequestProto reports) throws IOException {
final ContainerReportsResponseProto resp;
try {
resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports);
} catch (ServiceException e) {

View File

@ -21,9 +21,10 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@ -89,9 +90,8 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
}
@Override
public SCMHeartbeatResponseProto
sendContainerReport(RpcController controller,
ContainerReportsProto request)
public ContainerReportsResponseProto sendContainerReport(
RpcController controller, ContainerReportsRequestProto request)
throws ServiceException {
try {
return impl.sendContainerReport(request);

View File

@ -43,7 +43,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@ -716,7 +717,7 @@ public SCMVersionResponseProto getVersion(
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport, ReportState reportState) throws IOException {
List<SCMCommand> commands =
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport);
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport, reportState);
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd));
@ -749,13 +750,12 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto
sendContainerReport(ContainerReportsProto reports) throws IOException {
// TODO : fix this in the server side code changes for handling this request
// correctly.
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();
public ContainerReportsResponseProto sendContainerReport(
ContainerReportsRequestProto reports) throws IOException {
// TODO: handle the container reports either here or add container report
// handler.
return ContainerReportsResponseProto.newBuilder().build();
}
/**

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@ -260,7 +260,8 @@ private void initPoolProcessThread() {
* @param containerReport -- Container report for a specific container from
* a datanode.
*/
public void handleContainerReport(ContainerReportsProto containerReport) {
public void handleContainerReport(
ContainerReportsRequestProto containerReport) {
String poolName = null;
DatanodeID datanodeID = DatanodeID
.getFromProtoBuf(containerReport.getDatanodeID());

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.node.CommandQueue;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
@ -234,11 +234,13 @@ private NodeState getNodestate(DatanodeID id) {
*
* @param containerReport - ContainerReport
*/
public void handleContainerReport(ContainerReportsProto containerReport) {
public void handleContainerReport(
ContainerReportsRequestProto containerReport) {
executorService.submit(processContainerReport(containerReport));
}
private Runnable processContainerReport(ContainerReportsProto reports) {
private Runnable processContainerReport(
ContainerReportsRequestProto reports) {
return () -> {
DatanodeID datanodeID =
DatanodeID.getFromProtoBuf(reports.getDatanodeID());

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
@ -32,18 +33,21 @@ public class HeartbeatQueueItem {
private DatanodeID datanodeID;
private long recvTimestamp;
private SCMNodeReport nodeReport;
private ReportState containerReportState;
/**
*
* @param datanodeID - datanode ID of the heartbeat.
* @param recvTimestamp - heartbeat receive timestamp.
* @param nodeReport - node report associated with the heartbeat if any.
* @param containerReportState - container report state.
*/
HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp,
SCMNodeReport nodeReport) {
SCMNodeReport nodeReport, ReportState containerReportState) {
this.datanodeID = datanodeID;
this.recvTimestamp = recvTimestamp;
this.nodeReport = nodeReport;
this.containerReportState = containerReportState;
}
/**
@ -60,6 +64,13 @@ public SCMNodeReport getNodeReport() {
return nodeReport;
}
/**
* @return container report state.
*/
public ReportState getContainerReportState() {
return containerReportState;
}
/**
* @return heartbeat receive timestamp.
*/
@ -73,6 +84,7 @@ public long getRecvTimestamp() {
public static class Builder {
private DatanodeID datanodeID;
private SCMNodeReport nodeReport;
private ReportState containerReportState;
private long recvTimestamp = monotonicNow();
public Builder setDatanodeID(DatanodeID datanodeId) {
@ -85,6 +97,11 @@ public Builder setNodeReport(SCMNodeReport scmNodeReport) {
return this;
}
public Builder setContainerReportState(ReportState crs) {
this.containerReportState = crs;
return this;
}
@VisibleForTesting
public Builder setRecvTimestamp(long recvTime) {
this.recvTimestamp = recvTime;
@ -92,7 +109,8 @@ public Builder setRecvTimestamp(long recvTime) {
}
public HeartbeatQueueItem build() {
return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport);
return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport,
containerReportState);
}
}
}

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
.ErrorCode;
@ -769,12 +771,13 @@ private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
SCMNodeReport nodeReport, ReportState containerReportState) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
@ -785,6 +788,7 @@ public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
new HeartbeatQueueItem.Builder()
.setDatanodeID(datanodeID)
.setNodeReport(nodeReport)
.setContainerReportState(containerReportState)
.build());
} else {
LOG.error("Datanode ID in heartbeat is null");

View File

@ -19,12 +19,30 @@
import com.google.common.base.Strings;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
/**
* An utility class to filter levelDB keys.
*/
public class MetadataKeyFilters {
public final class MetadataKeyFilters {
private static KeyPrefixFilter deletingKeyFilter =
new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
private static KeyPrefixFilter normalKeyFilter =
new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX,
true);
private MetadataKeyFilters() {
}
public static KeyPrefixFilter getDeletingKeyFilter() {
return deletingKeyFilter;
}
public static KeyPrefixFilter getNormalKeyFilter() {
return normalKeyFilter;
}
/**
* Interface for levelDB key filters.
*/
@ -57,25 +75,34 @@ public static class KeyPrefixFilter implements MetadataKeyFilter {
private String keyPrefix = null;
private int keysScanned = 0;
private int keysHinted = 0;
private Boolean negative;
public KeyPrefixFilter(String keyPrefix) {
this(keyPrefix, false);
}
public KeyPrefixFilter(String keyPrefix, boolean negative) {
this.keyPrefix = keyPrefix;
this.negative = negative;
}
@Override
public boolean filterKey(byte[] preKey, byte[] currentKey,
byte[] nextKey) {
keysScanned++;
boolean accept = false;
if (Strings.isNullOrEmpty(keyPrefix)) {
return true;
accept = true;
} else {
if (currentKey != null &&
DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) {
keysHinted++;
return true;
accept = true;
} else {
accept = false;
}
return false;
}
return (negative) ? !accept : accept;
}
@Override

View File

@ -96,7 +96,13 @@ message ContainerInfo {
required string containerName = 1;
required string finalhash = 2;
optional int64 size = 3;
optional int64 keycount = 4;
optional int64 used = 4;
optional int64 keyCount = 5;
// TODO: move the io count to separate message
optional int64 readCount = 6;
optional int64 writeCount = 7;
optional int64 readBytes = 8;
optional int64 writeBytes = 9;
}
// The deleted blocks which are stored in deletedBlock.db of scm.
@ -112,7 +118,7 @@ message DeletedBlocksTransaction {
A set of container reports, max count is generally set to
8192 since that keeps the size of the reports under 1 MB.
*/
message ContainerReportsProto {
message ContainerReportsRequestProto {
enum reportType {
fullReport = 0;
deltaReport = 1;
@ -122,6 +128,9 @@ message ContainerReportsProto {
required reportType type = 3;
}
message ContainerReportsResponseProto {
}
/**
* This message is send along with the heart beat to report datanode
* storage utilization by SCM.
@ -337,7 +346,7 @@ service StorageContainerDatanodeProtocolService {
send container reports sends the container report to SCM. This will
return a null command as response.
*/
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto);
/**
* Sends the block deletion ACK to SCM.

View File

@ -345,6 +345,14 @@
etc. This picks one of those for this cluster.
</description>
</property>
<property>
<name>ozone.container.report.interval.ms</name>
<value>60000</value>
<tag>OZONE, CONTAINER, MANAGEMENT</tag>
<description>Time interval in milliseconds of the datanode to send container
report. Each datanode periodically send container report upon receive
sendContainerReport from SCM.</description>
</property>
<!--Ozone Settings-->
<property>
<name>ozone.administrators</name>

View File

@ -19,7 +19,7 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
@ -58,9 +58,9 @@ public ReplicationDatanodeStateManager(NodeManager nodeManager,
* @param dataNodeCount - Datanode Count.
* @return List of Container Reports.
*/
public List<ContainerReportsProto> getContainerReport(String containerName,
String poolName, int dataNodeCount) {
List<ContainerReportsProto> containerList = new LinkedList<>();
public List<ContainerReportsRequestProto> getContainerReport(
String containerName, String poolName, int dataNodeCount) {
List<ContainerReportsRequestProto> containerList = new LinkedList<>();
List<DatanodeID> nodesInPool = poolManager.getNodes(poolName);
if (nodesInPool == null) {
@ -81,10 +81,10 @@ public List<ContainerReportsProto> getContainerReport(String containerName,
.setContainerName(containerName)
.setFinalhash(DigestUtils.sha256Hex(containerName))
.build();
ContainerReportsProto containerReport = ContainerReportsProto
.newBuilder().addReports(info)
ContainerReportsRequestProto containerReport =
ContainerReportsRequestProto.newBuilder().addReports(info)
.setDatanodeID(id.getProtoBufMessage())
.setType(ContainerReportsProto.reportType.fullReport)
.setType(ContainerReportsRequestProto.reportType.fullReport)
.build();
containerList.add(containerReport);
}

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
@ -276,11 +278,12 @@ public SCMCommand register(DatanodeID datanodeID) {
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
SCMNodeReport nodeReport, ReportState containerReportState) {
return null;
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
@ -30,8 +31,11 @@
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,8 +48,10 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private AtomicInteger rpcCount = new AtomicInteger(0);
private ReportState reportState;
private AtomicInteger containerReportsCount = new AtomicInteger(0);
private AtomicInteger closedContainerCount = new AtomicInteger(0);
// Map of datanode to containers
private Map<DatanodeID, Map<String, ContainerInfo>> nodeContainers =
new HashMap();
/**
* Returns the number of heartbeats made to this class.
*
@ -91,11 +97,37 @@ public int getContainerReportsCount() {
}
/**
* Returns the number of closed containers that have been reported so far.
* @return - count of closed containers.
* Returns the number of containers that have been reported so far.
* @return - count of reported containers.
*/
public int getClosedContainerCount() {
return closedContainerCount.get();
public long getContainerCount() {
return nodeContainers.values().parallelStream().mapToLong((containerMap)->{
return containerMap.size();
}).sum();
}
/**
* Get the number keys reported from container reports.
* @return - number of keys reported.
*/
public long getKeyCount() {
return nodeContainers.values().parallelStream().mapToLong((containerMap)->{
return containerMap.values().parallelStream().mapToLong((container) -> {
return container.getKeyCount();
}).sum();
}).sum();
}
/**
* Get the number of bytes used from container reports.
* @return - number of bytes used.
*/
public long getBytesUsed() {
return nodeContainers.values().parallelStream().mapToLong((containerMap)->{
return containerMap.values().parallelStream().mapToLong((container) -> {
return container.getUsed();
}).sum();
}).sum();
}
/**
@ -178,16 +210,28 @@ private void sleepIfNeeded() {
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto
public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto
sendContainerReport(StorageContainerDatanodeProtocolProtos
.ContainerReportsProto reports) throws IOException {
.ContainerReportsRequestProto reports) throws IOException {
Preconditions.checkNotNull(reports);
containerReportsCount.incrementAndGet();
closedContainerCount.addAndGet(reports.getReportsCount());
List<SCMCommandResponseProto>
cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();
DatanodeID datanode = DatanodeID.getFromProtoBuf(reports.getDatanodeID());
if (reports.getReportsCount() > 0) {
Map containers = nodeContainers.get(datanode);
if (containers == null) {
containers = new LinkedHashMap();
nodeContainers.put(datanode, containers);
}
for (StorageContainerDatanodeProtocolProtos.ContainerInfo report:
reports.getReportsList()) {
containers.put(report.getContainerName(), report);
}
}
return StorageContainerDatanodeProtocolProtos
.ContainerReportsResponseProto.newBuilder().build();
}
@Override
@ -200,4 +244,18 @@ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
public ReportState getReportState() {
return this.reportState;
}
/**
* Reset the mock Scm for test to get a fresh start without rebuild MockScm.
*/
public void reset() {
heartbeatCount.set(0);
rpcCount.set(0);
reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
containerReportsCount.set(0);
nodeContainers.clear();
}
}

View File

@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -112,7 +113,7 @@ private ContainerManager createContainerManager(Configuration conf)
ContainerManager containerManager = new ContainerManagerImpl();
List<StorageLocation> pathLists = new LinkedList<>();
pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath()));
containerManager.init(conf, pathLists);
containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID());
return containerManager;
}
@ -126,7 +127,7 @@ private void createToDeleteBlocks(ContainerManager mgr,
int numOfChunksPerBlock, File chunkDir) throws IOException {
for (int x = 0; x < numOfContainers; x++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
mgr.createContainer(createSingleNodePipeline(containerName), data);
data = mgr.readContainer(containerName);
MetadataStore metadata = KeyUtils.getDB(data, conf);

View File

@ -98,10 +98,14 @@ public void setUp() throws Exception {
.getTempPath(TestDatanodeStateMachine.class.getSimpleName());
testRoot = new File(path);
if (!testRoot.mkdirs()) {
LOG.info("Required directories already exist.");
LOG.info("Required directories {} already exist.", testRoot);
}
File dataDir = new File(testRoot, "data");
conf.set(DFS_DATANODE_DATA_DIR_KEY, dataDir.getAbsolutePath());
if (!dataDir.mkdirs()) {
LOG.info("Data dir create failed.");
}
conf.set(DFS_DATANODE_DATA_DIR_KEY,
new File(testRoot, "data").getAbsolutePath());
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
new File(testRoot, "scm").getAbsolutePath());
path = Paths.get(path.toString(),
@ -334,6 +338,7 @@ public void testDatanodeStateMachineWithInvalidConfiguration()
confList.forEach((entry) -> {
Configuration perTestConf = new Configuration(conf);
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
DFSTestUtil.getLocalDatanodeID(), perTestConf)) {
DatanodeStateMachine.DatanodeStates currentState =

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
@ -39,6 +40,10 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
@ -97,8 +102,9 @@ public static void setUp() throws Exception {
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState.
newBuilder().setState(noContainerReports).setCount(0).build();
defaultReportState = StorageContainerDatanodeProtocolProtos.
ReportState.newBuilder().setState(noContainerReports).
setCount(0).build();
}
@Test
@ -368,11 +374,11 @@ ContainerReport getRandomContainerReport() {
* @param count - The number of closed containers to create.
* @return ContainerReportsProto
*/
StorageContainerDatanodeProtocolProtos.ContainerReportsProto
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto
createDummyContainerReports(int count) {
StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
.ContainerReportsProto.newBuilder();
.ContainerReportsRequestProto.newBuilder();
for (int x = 0; x < count; x++) {
reportsBuilder.addReports(getRandomContainerReport()
.getProtoBufMessage());
@ -380,7 +386,7 @@ ContainerReport getRandomContainerReport() {
reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
.getProtoBufMessage());
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
.ContainerReportsProto.reportType.fullReport);
.ContainerReportsRequestProto.reportType.fullReport);
return reportsBuilder.build();
}
@ -391,15 +397,63 @@ ContainerReport getRandomContainerReport() {
@Test
public void testContainerReportSend() throws Exception {
final int count = 1000;
scmServerImpl.reset();
try (EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
SCMHeartbeatResponseProto responseProto = rpcEndPoint
ContainerReportsResponseProto responseProto = rpcEndPoint
.getEndPoint().sendContainerReport(createDummyContainerReports(
count));
Assert.assertNotNull(responseProto);
}
Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
Assert.assertEquals(count, scmServerImpl.getClosedContainerCount());
Assert.assertEquals(count, scmServerImpl.getContainerCount());
}
/**
* Tests that rpcEndpoint sendContainerReport works as expected.
* @throws Exception
*/
@Test
public void testContainerReport() throws Exception {
final int count = 1000;
scmServerImpl.reset();
try (EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
ContainerReportsResponseProto responseProto = rpcEndPoint
.getEndPoint().sendContainerReport(createContainerReport(count));
Assert.assertNotNull(responseProto);
}
Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
Assert.assertEquals(count, scmServerImpl.getContainerCount());
final long expectedKeyCount = count * 1000;
Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount());
final long expectedBytesUsed = count * OzoneConsts.GB * 2;
Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed());
}
private ContainerReportsRequestProto createContainerReport(int count) {
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
.ContainerReportsRequestProto.newBuilder();
for (int x = 0; x < count; x++) {
ContainerReport report = new ContainerReport(UUID.randomUUID().toString(),
DigestUtils.sha256Hex("Simulated"));
report.setKeyCount(1000);
report.setSize(OzoneConsts.GB * 5);
report.setBytesUsed(OzoneConsts.GB * 2);
report.setReadCount(100);
report.setReadBytes(OzoneConsts.GB * 1);
report.setWriteCount(50);
report.setWriteBytes(OzoneConsts.GB * 2);
reportsBuilder.addReports(report.getProtoBufMessage());
}
reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
.getProtoBufMessage());
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
.ContainerReportsRequestProto.reportType.fullReport);
return reportsBuilder.build();
}
}

View File

@ -28,7 +28,9 @@
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -87,12 +89,12 @@ public void testRandomChoosingPolicy() throws IOException {
List<StorageLocation> pathLists = new LinkedList<>();
pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath()));
containerManager = new ContainerManagerImpl();
containerManager.init(conf, pathLists);
containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID());
int numContainers = 10;
for (int i = 0; i < numContainers; i++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
containerManager.createContainer(createSingleNodePipeline(containerName),
data);
Assert.assertTrue(
@ -133,7 +135,8 @@ public void testTopNOrderedChoosingPolicy() throws IOException {
List<StorageLocation> pathLists = new LinkedList<>();
pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath()));
containerManager = new ContainerManagerImpl();
containerManager.init(conf, pathLists);
DatanodeID datanodeID = DFSTestUtil.getLocalDatanodeID();
containerManager.init(conf, pathLists, datanodeID);
int numContainers = 10;
Random random = new Random();
@ -141,7 +144,7 @@ public void testTopNOrderedChoosingPolicy() throws IOException {
// create [numContainers + 1] containers
for (int i = 0; i <= numContainers; i++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
containerManager.createContainer(createSingleNodePipeline(containerName),
data);
Assert.assertTrue(
@ -169,7 +172,7 @@ public void testTopNOrderedChoosingPolicy() throws IOException {
containerManager.writeLock();
containerManager.shutdown();
containerManager.writeUnlock();
containerManager.init(conf, pathLists);
containerManager.init(conf, pathLists, datanodeID);
List<ContainerData> result0 = containerManager
.chooseContainerForBlockDeletion(5);

View File

@ -21,6 +21,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -143,11 +144,25 @@ public void setupPaths() throws IOException {
loc.getNormalizedUri());
}
pathLists.add(loc);
containerManager.init(conf, pathLists);
for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
FileUtils.forceMkdir(new File(location.getNormalizedUri()));
}
containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID());
}
@After
public void cleanupDir() throws IOException {
// Shutdown containerManager
containerManager.writeLock();
try {
containerManager.shutdown();
} finally {
containerManager.writeUnlock();
}
// Clean up SCM metadata
log.info("Deletting {}", path);
FileUtils.deleteDirectory(new File(path));
@ -163,7 +178,7 @@ public void cleanupDir() throws IOException {
public void testCreateContainer() throws Exception {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName),
@ -199,7 +214,7 @@ public void testCreateContainer() throws Exception {
public void testCreateDuplicateContainer() throws Exception {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName),
@ -219,14 +234,14 @@ public void testDeleteContainer() throws Exception {
String containerName2 = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName1);
ContainerData data = new ContainerData(containerName1, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName1),
data);
containerManager.closeContainer(containerName1);
data = new ContainerData(containerName2);
data = new ContainerData(containerName2, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName2),
@ -246,7 +261,7 @@ public void testDeleteContainer() throws Exception {
// Let us make sure that we are able to re-use a container name after
// delete.
data = new ContainerData(containerName1);
data = new ContainerData(containerName1, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName1),
@ -284,7 +299,7 @@ public void testGetContainerReports() throws Exception{
for (int i = 0; i < count; i++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
containerManager.createContainer(createSingleNodePipeline(containerName),
data);
@ -321,7 +336,7 @@ public void testListContainer() throws IOException {
for (int x = 0; x < count; x++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
containerManager.createContainer(createSingleNodePipeline(containerName),
@ -355,7 +370,7 @@ private ChunkInfo writeChunkHelper(String containerName, String keyName,
NoSuchAlgorithmException {
final int datalen = 1024;
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner", "bilbo");
if(!containerManager.getContainerMap()
@ -404,7 +419,7 @@ public void testWritReadManyChunks() throws IOException,
Map<String, ChunkInfo> fileHashMap = new HashMap<>();
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
containerManager.createContainer(pipeline, cData);
@ -468,7 +483,7 @@ public void testPartialRead() throws Exception {
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
containerManager.createContainer(pipeline, cData);
@ -503,7 +518,7 @@ public void testOverWrite() throws IOException,
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
containerManager.createContainer(pipeline, cData);
@ -521,6 +536,11 @@ public void testOverWrite() throws IOException,
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(pipeline, keyName, info, data);
long bytesUsed = containerManager.getBytesUsed(containerName);
Assert.assertEquals(datalen, bytesUsed);
long bytesWrite = containerManager.getWriteBytes(containerName);
Assert.assertEquals(datalen * 2, bytesWrite);
}
/**
@ -541,7 +561,7 @@ public void testMultipleWriteSingleRead() throws IOException,
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
containerManager.createContainer(pipeline, cData);
@ -580,7 +600,7 @@ public void testDeleteChunk() throws IOException,
Pipeline pipeline = createSingleNodePipeline(containerName);
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
ContainerData cData = new ContainerData(containerName, conf);
cData.addMetadata("VOLUME", "shire");
cData.addMetadata("owner)", "bilbo");
containerManager.createContainer(pipeline, cData);
@ -626,22 +646,35 @@ public void testPutKey() throws IOException, NoSuchAlgorithmException {
@Test
public void testPutKeyWithLotsOfChunks() throws IOException,
NoSuchAlgorithmException {
final int chunkCount = 1024;
final int chunkCount = 2;
final int datalen = 1024;
long totalSize = 0L;
String containerName = OzoneUtils.getRequestID();
String keyName = OzoneUtils.getRequestID();
Pipeline pipeline = createSingleNodePipeline(containerName);
List<ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
totalSize += datalen;
chunkList.add(info);
for (int x = 1; x < chunkCount; x++) {
// with holes in the front (before x * datalen)
info = getChunk(keyName, x, x * datalen, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data);
totalSize += datalen * (x + 1);
chunkList.add(info);
}
long bytesUsed = containerManager.getBytesUsed(containerName);
Assert.assertEquals(totalSize, bytesUsed);
long writeBytes = containerManager.getWriteBytes(containerName);
Assert.assertEquals(chunkCount * datalen, writeBytes);
long readCount = containerManager.getReadCount(containerName);
Assert.assertEquals(0, readCount);
long writeCount = containerManager.getWriteCount(containerName);
Assert.assertEquals(chunkCount, writeCount);
KeyData keyData = new KeyData(containerName, keyName);
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
for (ChunkInfo i : chunkList) {
@ -713,7 +746,7 @@ public void testDeleteKeyTwice() throws IOException,
@Test
public void testUpdateContainer() throws IOException {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner", "bilbo");
@ -724,7 +757,7 @@ public void testUpdateContainer() throws IOException {
File orgContainerFile = containerManager.getContainerFile(data);
Assert.assertTrue(orgContainerFile.exists());
ContainerData newData = new ContainerData(containerName);
ContainerData newData = new ContainerData(containerName, conf);
newData.addMetadata("VOLUME", "shire_new");
newData.addMetadata("owner", "bilbo_new");
@ -757,7 +790,7 @@ public void testUpdateContainer() throws IOException {
ContainerProtos.ContainerData actualContainerDataProto =
ContainerProtos.ContainerData.parseDelimitedFrom(newIn);
ContainerData actualContainerData = ContainerData
.getFromProtBuf(actualContainerDataProto);
.getFromProtBuf(actualContainerDataProto, conf);
Assert.assertEquals(actualContainerData.getAllMetadata().get("VOLUME"),
"shire_new");
Assert.assertEquals(actualContainerData.getAllMetadata().get("owner"),
@ -776,7 +809,7 @@ public void testUpdateContainer() throws IOException {
}
// Update with force flag, it should be success.
newData = new ContainerData(containerName);
newData = new ContainerData(containerName, conf);
newData.addMetadata("VOLUME", "shire_new_1");
newData.addMetadata("owner", "bilbo_new_1");
containerManager.updateContainer(createSingleNodePipeline(containerName),

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.container.replication
.ContainerReplicationManager;
import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
@ -145,7 +145,7 @@ public void testDetectSingleContainerReplica() throws TimeoutException,
String threeNodeContainer = "ThreeNodeContainer";
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
// Only single datanode reporting that "SingleNodeContainer" exists.
List<ContainerReportsProto> clist =
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport(singleNodeContainer,
ppool.getPool().getPoolName(), 1);
ppool.handleContainerReport(clist.get(0));
@ -154,7 +154,7 @@ public void testDetectSingleContainerReplica() throws TimeoutException,
clist = datanodeStateManager.getContainerReport(threeNodeContainer,
ppool.getPool().getPoolName(), 3);
for (ContainerReportsProto reportsProto : clist) {
for (ContainerReportsRequestProto reportsProto : clist) {
ppool.handleContainerReport(reportsProto);
}
GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
@ -181,7 +181,7 @@ public void testDetectOverReplica() throws TimeoutException,
String wayOverReplicated = "WayOverReplicated";
InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
List<ContainerReportsProto> clist =
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport(normalContainer,
ppool.getPool().getPoolName(), 3);
ppool.handleContainerReport(clist.get(0));
@ -189,14 +189,14 @@ public void testDetectOverReplica() throws TimeoutException,
clist = datanodeStateManager.getContainerReport(overReplicated,
ppool.getPool().getPoolName(), 4);
for (ContainerReportsProto reportsProto : clist) {
for (ContainerReportsRequestProto reportsProto : clist) {
ppool.handleContainerReport(reportsProto);
}
clist = datanodeStateManager.getContainerReport(wayOverReplicated,
ppool.getPool().getPoolName(), 7);
for (ContainerReportsProto reportsProto : clist) {
for (ContainerReportsRequestProto reportsProto : clist) {
ppool.handleContainerReport(reportsProto);
}
@ -249,7 +249,7 @@ public void testAddingNewPoolWorks()
// Assert that we are able to send a container report to this new
// pool and datanode.
List<ContainerReportsProto> clist =
List<ContainerReportsRequestProto> clist =
datanodeStateManager.getContainerReport("NewContainer1",
"PoolNew", 1);
replicationManager.handleContainerReport(clist.get(0));

View File

@ -166,7 +166,7 @@ public void testDeleteContainer() throws Exception {
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE,
containerName);
containerData = new ContainerData(containerName);
containerData = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, containerData);
ContainerData cdata = containerManager.readContainer(containerName);
KeyUtils.getDB(cdata, conf).put(containerName.getBytes(),
@ -207,7 +207,7 @@ public void testDeleteContainer() throws Exception {
containerName = "empty-container";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
containerData = new ContainerData(containerName);
containerData = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, containerData);
containerManager.closeContainer(containerName);
Assert.assertTrue(containerExist(containerName));
@ -271,7 +271,7 @@ public void testInfoContainer() throws Exception {
cname = "ContainerTestInfo1";
Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
ContainerData data = new ContainerData(cname);
ContainerData data = new ContainerData(cname, conf);
containerManager.createContainer(pipeline, data);
info = new String[]{"-container", "-info", "-c", cname};
@ -292,7 +292,7 @@ public void testInfoContainer() throws Exception {
cname = "ContainerTestInfo2";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
data = new ContainerData(cname);
data = new ContainerData(cname, conf);
containerManager.createContainer(pipeline, data);
KeyUtils.getDB(data, conf).put(cname.getBytes(),
"someKey".getBytes());
@ -313,7 +313,7 @@ public void testInfoContainer() throws Exception {
cname = "ContainerTestInfo3";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
data = new ContainerData(cname);
data = new ContainerData(cname, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner", "bilbo");
containerManager.createContainer(pipeline, data);
@ -378,7 +378,7 @@ public void testListContainerCommand() throws Exception {
String containerName = String.format("%s%02d", prefix, index);
Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
ContainerData data = new ContainerData(containerName);
ContainerData data = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, data);
}

View File

@ -24,7 +24,12 @@
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
@ -324,8 +329,7 @@ public void run() {
* datanode.
*/
@Override
public VersionResponse getVersion(StorageContainerDatanodeProtocolProtos
.SCMVersionRequestProto versionRequest) {
public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
return null;
}
@ -347,11 +351,12 @@ public SCMCommand register(DatanodeID datanodeID) {
*
* @param datanodeID - Datanode ID.
* @param nodeReport - node report.
* @param containerReportState - container report state.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
SCMNodeReport nodeReport, ReportState containerReportState) {
if ((datanodeID != null) && (nodeReport != null) && (nodeReport
.getStorageReportCount() > 0)) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString());
@ -359,10 +364,8 @@ public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
long totalCapacity = 0L;
long totalRemaining = 0L;
long totalScmUsed = 0L;
List<StorageContainerDatanodeProtocolProtos.SCMStorageReport>
storageReports = nodeReport.getStorageReportList();
for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report :
storageReports) {
List<SCMStorageReport> storageReports = nodeReport.getStorageReportList();
for (SCMStorageReport report : storageReports) {
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalScmUsed += report.getScmUsed();

View File

@ -28,7 +28,11 @@
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
@ -64,6 +68,11 @@ public class TestContainerPlacement {
public ExpectedException thrown = ExpectedException.none();
private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(new OzoneConfiguration());
private ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
/**
* Returns a new copy of Configuration.
*
@ -128,16 +137,13 @@ public void testContainerPlacementCapacity() throws IOException,
SCMTestUtils.getRegisteredDatanodeIDs(nodeManager, nodeCount);
try {
for (DatanodeID datanodeID : datanodes) {
StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder();
StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb =
StorageContainerDatanodeProtocolProtos.SCMStorageReport
.newBuilder();
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(used).
setRemaining(remaining).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
nrb.addStorageReport(srb).build(), reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@ -164,16 +170,13 @@ public void testContainerPlacementCapacity() throws IOException,
final long newRemaining = capacity - newUsed;
for (DatanodeID datanodeID : datanodes) {
StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder();
StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb =
StorageContainerDatanodeProtocolProtos.SCMStorageReport
.newBuilder();
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(newUsed).
setRemaining(newRemaining).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
nrb.addStorageReport(srb).build(), reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining()

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
@ -77,6 +79,10 @@ public class TestNodeManager {
private File testDir;
private ReportState reportState = ReportState.newBuilder()
.setState(ReportState.states.noContainerReports)
.setCount(0).build();
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -141,7 +147,7 @@ public void testScmHeartbeat() throws IOException,
// Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID, null);
nodeManager.sendHeartbeat(datanodeID, null, reportState);
}
// Wait for 4 seconds max.
@ -187,7 +193,8 @@ public void testScmNotEnoughHeartbeats() throws IOException,
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null);
nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager),
null, reportState);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have" +
@ -213,7 +220,7 @@ public void testScmSameNodeHeartbeats() throws IOException,
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
nodeManager.sendHeartbeat(datanodeID, null);
nodeManager.sendHeartbeat(datanodeID, null, reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
@ -242,7 +249,7 @@ public void testScmShutdown() throws IOException, InterruptedException,
nodeManager.close();
// These should never be processed.
nodeManager.sendHeartbeat(datanodeID, null);
nodeManager.sendHeartbeat(datanodeID, null, reportState);
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
@ -264,7 +271,8 @@ public void testScmHeartbeatAfterRestart() throws Exception {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
nodemanager.register(datanodeID);
List<SCMCommand> command = nodemanager.sendHeartbeat(datanodeID, null);
List<SCMCommand> command = nodemanager.sendHeartbeat(datanodeID,
null, reportState);
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID));
Assert.assertTrue("On regular HB calls, SCM responses a "
+ "datanode with an empty command list", command.isEmpty());
@ -282,7 +290,8 @@ public void testScmHeartbeatAfterRestart() throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
List<SCMCommand> command =
nodemanager.sendHeartbeat(datanodeID, null);
nodemanager.sendHeartbeat(datanodeID, null,
reportState);
return command.size() == 1 && command.get(0).getType()
.equals(Type.reregisterCommand);
}
@ -312,7 +321,7 @@ public void testScmHealthyNodeCount() throws IOException,
for (int x = 0; x < count; x++) {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID, null);
nodeManager.sendHeartbeat(datanodeID, null, reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
@ -400,18 +409,18 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager);
// Heartbeat once
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(staleNode, null, reportState);
// Heartbeat all other nodes.
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
nodeManager.sendHeartbeat(dn, null, reportState);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
nodeManager.sendHeartbeat(dn, null, reportState);
}
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
@ -428,7 +437,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
// heartbeat good nodes again.
for (DatanodeID dn : nodeList) {
nodeManager.sendHeartbeat(dn, null);
nodeManager.sendHeartbeat(dn, null, reportState);
}
// 6 seconds is the dead window for this test , so we wait a total of
@ -466,7 +475,7 @@ public void testScmLogErrorOnNullDatanode() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
nodeManager.sendHeartbeat(null, null);
nodeManager.sendHeartbeat(null, null, reportState);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(),
containsString("Datanode ID in heartbeat is null"));
@ -542,9 +551,9 @@ public void testScmClusterIsInExpectedState1() throws IOException,
SCMTestUtils.getDatanodeID(nodeManager, "StaleNode");
DatanodeID deadNode =
SCMTestUtils.getDatanodeID(nodeManager, "DeadNode");
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
nodeManager.sendHeartbeat(staleNode, null, reportState);
nodeManager.sendHeartbeat(deadNode, null, reportState);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
@ -570,12 +579,12 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* the 3 second windows.
*/
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
nodeManager.sendHeartbeat(staleNode, null, reportState);
nodeManager.sendHeartbeat(deadNode, null, reportState);
Thread.sleep(1500);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@ -595,10 +604,10 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* staleNode to move to stale state and deadNode to move to dead state.
*/
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
nodeManager.sendHeartbeat(staleNode, null, reportState);
Thread.sleep(1500);
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
@ -631,9 +640,9 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state.
*/
nodeManager.sendHeartbeat(healthyNode, null);
nodeManager.sendHeartbeat(staleNode, null);
nodeManager.sendHeartbeat(deadNode, null);
nodeManager.sendHeartbeat(healthyNode, null, reportState);
nodeManager.sendHeartbeat(staleNode, null, reportState);
nodeManager.sendHeartbeat(deadNode, null, reportState);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
@ -653,7 +662,7 @@ private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
for (DatanodeID dn : list) {
manager.sendHeartbeat(dn, null);
manager.sendHeartbeat(dn, null, reportState);
}
Thread.sleep(sleepDuration);
}
@ -739,7 +748,7 @@ public void testScmClusterIsInExpectedState2() throws IOException,
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
for (DatanodeID dn : deadNodeList) {
nodeManager.sendHeartbeat(dn, null);
nodeManager.sendHeartbeat(dn, null, reportState);
}
@ -893,7 +902,7 @@ public void testScmEnterAndExitChillMode() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.setMinimumChillModeNodes(10);
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID, null);
nodeManager.sendHeartbeat(datanodeID, null, reportState);
String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
"mode, waiting on nodes to report in."));
@ -920,7 +929,7 @@ public void testScmEnterAndExitChillMode() throws IOException,
// Assert that node manager force enter cannot be overridden by nodes HBs.
for (int x = 0; x < 20; x++) {
DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanode, null);
nodeManager.sendHeartbeat(datanode, null, reportState);
}
Thread.sleep(500);
@ -963,7 +972,7 @@ public void testScmStatsFromNodeReport() throws IOException,
srb.setCapacity(capacity).setScmUsed(used).
setRemaining(capacity - used).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
nrb.addStorageReport(srb).build(), reportState);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
@ -1010,7 +1019,7 @@ public void testScmNodeReportUpdate() throws IOException,
.setRemaining(capacity - x * usedPerHeartbeat).build();
nrb.addStorageReport(srb);
nodeManager.sendHeartbeat(datanodeID, nrb.build());
nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState);
Thread.sleep(100);
}
@ -1092,7 +1101,7 @@ public void testScmNodeReportUpdate() throws IOException,
srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
.setRemaining(expectedRemaining).build();
nrb.addStorageReport(srb);
nodeManager.sendHeartbeat(datanodeID, nrb.build());
nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState);
// Wait up to 5 seconds so that the dead node becomes healthy
// Verify usage info should be updated.