From a46701706801538b3527f863dc9b195466de4345 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Fri, 13 Oct 2017 10:51:29 -0700 Subject: [PATCH] HDFS-12411. Ozone: Add container usage information to DN container report. Contributed by Xiaoyu Yao. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 5 + .../proto/DatanodeContainerProtocol.proto | 4 +- .../common/helpers/ContainerData.java | 57 +++++++- .../common/helpers/ContainerReport.java | 86 +++++++++-- .../container/common/helpers/KeyData.java | 9 ++ .../common/impl/ChunkManagerImpl.java | 39 +++-- .../impl/ContainerReportManagerImpl.java | 86 +++++++++++ .../common/impl/ContainerStatus.java | 136 +++++++++++++++++- .../container/common/impl/Dispatcher.java | 4 +- .../container/common/impl/KeyManagerImpl.java | 13 +- .../common/interfaces/ContainerManager.java | 78 +++++++++- .../interfaces/ContainerReportManager.java | 32 +++++ .../statemachine/DatanodeStateMachine.java | 1 + .../ContainerReportHandler.java | 20 +-- .../container/ozoneimpl/OzoneContainer.java | 25 +++- .../ozone/ksm/KSMMetadataManagerImpl.java | 6 +- .../StorageContainerDatanodeProtocol.java | 11 +- .../StorageContainerNodeProtocol.java | 4 +- ...atanodeProtocolClientSideTranslatorPB.java | 9 +- ...atanodeProtocolServerSideTranslatorPB.java | 8 +- .../ozone/scm/StorageContainerManager.java | 18 +-- .../ContainerReplicationManager.java | 5 +- .../container/replication/InProgressPool.java | 8 +- .../ozone/scm/node/HeartbeatQueueItem.java | 22 ++- .../hadoop/ozone/scm/node/SCMNodeManager.java | 6 +- .../hadoop/utils/MetadataKeyFilters.java | 35 ++++- .../StorageContainerDatanodeProtocol.proto | 15 +- .../src/main/resources/ozone-default.xml | 8 ++ .../ReplicationDatanodeStateManager.java | 14 +- .../TestUtils/ReplicationNodeManagerMock.java | 5 +- .../ozone/container/common/ScmTestMock.java | 82 +++++++++-- .../common/TestBlockDeletingService.java | 5 +- .../common/TestDatanodeStateMachine.java | 11 +- .../ozone/container/common/TestEndPoint.java | 70 +++++++-- .../TestContainerDeletionChoosingPolicy.java | 13 +- .../common/impl/TestContainerPersistence.java | 71 ++++++--- .../TestContainerReplicationManager.java | 14 +- .../apache/hadoop/ozone/scm/TestSCMCli.java | 12 +- .../ozone/scm/container/MockNodeManager.java | 19 +-- .../scm/node/TestContainerPlacement.java | 29 ++-- .../ozone/scm/node/TestNodeManager.java | 73 +++++----- 41 files changed, 945 insertions(+), 223 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index ff123a9aae..12c2102e33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -156,6 +156,11 @@ public final class OzoneConfigKeys { public static final int OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10; + public static final String OZONE_CONTAINER_REPORT_INTERVAL_MS = + "ozone.container.report.interval.ms"; + public static final int OZONE_CONTAINER_REPORT_INTERVAL_MS_DEFAULT = + 60000; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto index 424b1653a9..7fb92c9740 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto @@ -195,8 +195,6 @@ message ContainerCommandResponseProto { } - - message ContainerData { required string name = 1; repeated KeyValue metadata = 2; @@ -204,6 +202,8 @@ message ContainerData { optional string containerPath = 4; optional bool open = 5 [default = true]; optional string hash = 6; + optional int64 bytesUsed = 7; + optional int64 size = 8; } message ContainerMeta { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java index 1b70b1b323..58853edc62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -19,14 +19,18 @@ package org.apache.hadoop.ozone.container.common.helpers; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.util.Time; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; /** * This class maintains the information about a container in the ozone world. @@ -43,16 +47,22 @@ public class ContainerData { private String containerFilePath; private boolean open; private String hash; + private AtomicLong bytesUsed; + private long maxSize; /** * Constructs a ContainerData Object. * * @param containerName - Name */ - public ContainerData(String containerName) { + public ContainerData(String containerName, Configuration conf) { this.metadata = new TreeMap<>(); this.containerName = containerName; this.open = true; + this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, + ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB; + this.bytesUsed = new AtomicLong(0L); + } /** @@ -62,8 +72,9 @@ public ContainerData(String containerName) { * @throws IOException */ public static ContainerData getFromProtBuf( - ContainerProtos.ContainerData protoData) throws IOException { - ContainerData data = new ContainerData(protoData.getName()); + ContainerProtos.ContainerData protoData, Configuration conf) + throws IOException { + ContainerData data = new ContainerData(protoData.getName(), conf); for (int x = 0; x < protoData.getMetadataCount(); x++) { data.addMetadata(protoData.getMetadata(x).getKey(), protoData.getMetadata(x).getValue()); @@ -86,6 +97,14 @@ public static ContainerData getFromProtBuf( if(protoData.hasHash()) { data.setHash(protoData.getHash()); } + + if (protoData.hasBytesUsed()) { + data.setBytesUsed(protoData.getBytesUsed()); + } + + if (protoData.hasSize()) { + data.setMaxSize(protoData.getSize()); + } return data; } @@ -120,6 +139,13 @@ public ContainerProtos.ContainerData getProtoBufMessage() { .setValue(entry.getValue()).build()); } + if (this.getBytesUsed() >= 0) { + builder.setBytesUsed(this.getBytesUsed()); + } + + if (this.getMaxSize() >= 0) { + builder.setSize(this.getMaxSize()); + } return builder.build(); } @@ -251,8 +277,6 @@ public void setHash(String hash) { this.hash = hash; } - - /** * Sets the open or closed values. * @param open @@ -261,4 +285,27 @@ public synchronized void setOpen(boolean open) { this.open = open; } + public void setMaxSize(long maxSize) { + this.maxSize = maxSize; + } + + public long getMaxSize() { + return maxSize; + } + + public long getKeyCount() { + return metadata.size(); + } + + public void setBytesUsed(long used) { + this.bytesUsed.set(used); + } + + public long addBytesUsed(long delta) { + return this.bytesUsed.addAndGet(delta); + } + + public long getBytesUsed() { + return bytesUsed.get(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java index 034cbc22b8..87a2493a06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java @@ -23,14 +23,6 @@ /** * Container Report iterates the closed containers and sends a container report * to SCM. - *

- * The protobuf counter part of this class looks like this. - * message ContainerInfo { - * required string containerName = 1; - * required string finalhash = 2; - * optional int64 size = 3; - * optional int64 keycount = 4; - * } */ public class ContainerReport { private static final int UNKNOWN = -1; @@ -38,6 +30,12 @@ public class ContainerReport { private final String finalhash; private long size; private long keyCount; + private long bytesUsed; + private long readCount; + private long writeCount; + private long readBytes; + private long writeBytes; + /** * Constructs the ContainerReport. @@ -50,6 +48,11 @@ public ContainerReport(String containerName, String finalhash) { this.finalhash = finalhash; this.size = UNKNOWN; this.keyCount = UNKNOWN; + this.bytesUsed = 0L; + this.readCount = 0L; + this.readBytes = 0L; + this.writeCount = 0L; + this.writeBytes = 0L; } /** @@ -65,9 +68,25 @@ public static ContainerReport getFromProtoBuf(ContainerInfo info) { if (info.hasSize()) { report.setSize(info.getSize()); } - if (info.hasKeycount()) { - report.setKeyCount(info.getKeycount()); + if (info.hasKeyCount()) { + report.setKeyCount(info.getKeyCount()); } + if (info.hasUsed()) { + report.setBytesUsed(info.getUsed()); + } + if (info.hasReadCount()) { + report.setReadCount(info.getReadCount()); + } + if (info.hasReadBytes()) { + report.setReadBytes(info.getReadBytes()); + } + if (info.hasWriteCount()) { + report.setWriteCount(info.getWriteCount()); + } + if (info.hasWriteBytes()) { + report.setWriteBytes(info.getWriteBytes()); + } + return report; } @@ -125,6 +144,46 @@ public void setKeyCount(long keyCount) { this.keyCount = keyCount; } + public long getReadCount() { + return readCount; + } + + public void setReadCount(long readCount) { + this.readCount = readCount; + } + + public long getWriteCount() { + return writeCount; + } + + public void setWriteCount(long writeCount) { + this.writeCount = writeCount; + } + + public long getReadBytes() { + return readBytes; + } + + public void setReadBytes(long readBytes) { + this.readBytes = readBytes; + } + + public long getWriteBytes() { + return writeBytes; + } + + public void setWriteBytes(long writeBytes) { + this.writeBytes = writeBytes; + } + + public long getBytesUsed() { + return bytesUsed; + } + + public void setBytesUsed(long bytesUsed) { + this.bytesUsed = bytesUsed; + } + /** * Gets a containerInfo protobuf message from ContainerReports. * @@ -133,8 +192,13 @@ public void setKeyCount(long keyCount) { public ContainerInfo getProtoBufMessage() { return ContainerInfo.newBuilder() .setContainerName(this.getContainerName()) - .setKeycount(this.getKeyCount()) + .setKeyCount(this.getKeyCount()) .setSize(this.getSize()) + .setUsed(this.getBytesUsed()) + .setReadCount(this.getReadCount()) + .setReadBytes(this.getReadBytes()) + .setWriteCount(this.getWriteCount()) + .setWriteBytes(this.getWriteBytes()) .setFinalhash(this.getFinalhash()) .build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java index 590513eeb8..639de166f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java @@ -158,4 +158,13 @@ public String getKeyName() { public void setChunks(List chunks) { this.chunks = chunks; } + + /** + * Get the total size of chunks allocated for the key. + * @return total size of the key. + */ + public long getSize() { + return chunks.parallelStream().mapToLong(e->e.getLen()).sum(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java index cace6c91cf..b4ceb979f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.concurrent.ExecutionException; @@ -67,19 +69,24 @@ public ChunkManagerImpl(ContainerManager manager) { public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, byte[] data) throws StorageContainerException { - // we don't want container manager to go away while we are writing chunks. containerManager.readLock(); // TODO : Take keyManager Write lock here. try { Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - Preconditions.checkNotNull(pipeline.getContainerName(), + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, "Container name cannot be null"); - File chunkFile = ChunkUtils.validateChunk(pipeline, - containerManager.readContainer(pipeline.getContainerName()), info); + ContainerData container = + containerManager.readContainer(containerName); + File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); + long oldSize = chunkFile.length(); ChunkUtils.writeData(chunkFile, info, data); - + containerManager.incrWriteBytes(containerName, info.getLen()); + containerManager.incrWriteCount(containerName); + long newSize = chunkFile.length(); + containerManager.incrBytesUsed(containerName, newSize - oldSize); } catch (ExecutionException | NoSuchAlgorithmException e) { LOG.error("write data failed. error: {}", e); throw new StorageContainerException("Internal error: ", e, @@ -110,9 +117,17 @@ public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws StorageContainerException { containerManager.readLock(); try { - File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager - .readContainer(pipeline.getContainerName()), info); - return ChunkUtils.readData(chunkFile, info).array(); + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); + ContainerData container = + containerManager.readContainer(containerName); + File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info); + ByteBuffer data = ChunkUtils.readData(chunkFile, info); + containerManager.incrReadCount(containerName); + containerManager.incrReadBytes(containerName, chunkFile.length()); + return data.array(); } catch (ExecutionException | NoSuchAlgorithmException e) { LOG.error("read data failed. error: {}", e); throw new StorageContainerException("Internal error: ", @@ -138,13 +153,17 @@ public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) @Override public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws StorageContainerException { - containerManager.readLock(); try { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, + "Container name cannot be null"); File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager - .readContainer(pipeline.getContainerName()), info); + .readContainer(containerName), info); if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { FileUtil.fullyDelete(chunkFile); + containerManager.decrBytesUsed(containerName, chunkFile.length()); } else { LOG.error("Not Supported Operation. Trying to delete a " + "chunk that is in shared file. chunk info : " + info.toString()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java new file mode 100644 index 0000000000..0e87930d4d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerReportManager; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.util.Time; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Class wraps the container report operations on datanode. + * // TODO: support incremental/delta container report + */ +public class ContainerReportManagerImpl implements ContainerReportManager { + private Configuration config; + // Last non-empty container report time + private long lastContainerReportTime; + private final long containerReportInterval; + private AtomicLong reportCount; + private static final ReportState NO_CONTAINER_REPORTSTATE = + ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + + public ContainerReportManagerImpl(Configuration config) { + this.config = config; + this.lastContainerReportTime = -1; + this.reportCount = new AtomicLong(0L); + this.containerReportInterval = config.getLong( + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_MS, + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_MS_DEFAULT); + } + + public ReportState getContainerReportState() { + if (lastContainerReportTime < 0) { + return getFullContainerReportState(); + } else { + // Add a random delay (0~30s) on top of the container report + // interval (60s) so tha the SCM is overwhelmed by the container reports + // sent in sync. + if (Time.monotonicNow() - lastContainerReportTime > + (containerReportInterval + getRandomReportDelay())) { + return getFullContainerReportState(); + } else { + return getNoContainerReportState(); + } + } + } + + private ReportState getFullContainerReportState() { + ReportState.Builder rsBuilder = ReportState.newBuilder(); + rsBuilder.setState(ReportState.states.completeContinerReport); + rsBuilder.setCount(reportCount.incrementAndGet()); + this.lastContainerReportTime = Time.monotonicNow(); + return rsBuilder.build(); + } + + private ReportState getNoContainerReportState() { + return NO_CONTAINER_REPORTSTATE; + } + + private long getRandomReportDelay() { + return RandomUtils.nextLong(0, + OzoneClientUtils.getScmHeartbeatInterval(config)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java index 183157d403..5577323a10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java @@ -19,8 +19,10 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import java.util.concurrent.atomic.AtomicLong; + /** - * This is an immutable class that represents the state of a container. if the + * This class represents the state of a container. if the * container reading encountered an error when we boot up we will post that * info to a recovery queue and keep the info in the containerMap. *

@@ -36,6 +38,14 @@ public class ContainerStatus { */ private int numPendingDeletionBlocks; + private AtomicLong readBytes; + + private AtomicLong writeBytes; + + private AtomicLong readCount; + + private AtomicLong writeCount; + /** * Creates a Container Status class. * @@ -44,6 +54,10 @@ public class ContainerStatus { ContainerStatus(ContainerData containerData) { this.numPendingDeletionBlocks = 0; this.containerData = containerData; + this.readCount = new AtomicLong(0L); + this.readBytes = new AtomicLong(0L); + this.writeCount = new AtomicLong(0L); + this.writeBytes = new AtomicLong(0L); } /** @@ -80,4 +94,124 @@ public void decrPendingDeletionBlocks(int numBlocks) { public int getNumPendingDeletionBlocks() { return this.numPendingDeletionBlocks; } + + /** + * Get the number of bytes read from the container. + * @return the number of bytes read from the container. + */ + public long getReadBytes() { + return readBytes.get(); + } + + /** + * Increase the number of bytes read from the container. + * @param bytes number of bytes read. + */ + public void incrReadBytes(long bytes) { + this.readBytes.addAndGet(bytes); + } + + /** + * Get the number of times the container is read. + * @return the number of times the container is read. + */ + public long getReadCount() { + return readCount.get(); + } + + /** + * Increase the number of container read count by 1. + */ + public void incrReadCount() { + this.readCount.incrementAndGet(); + } + + /** + * Get the number of bytes write into the container. + * @return the number of bytes write into the container. + */ + public long getWriteBytes() { + return writeBytes.get(); + } + + /** + * Increase the number of bytes write into the container. + * @param bytes the number of bytes write into the container. + */ + public void incrWriteBytes(long bytes) { + this.writeBytes.addAndGet(bytes); + } + + /** + * Get the number of writes into the container. + * @return the number of writes into the container. + */ + public long getWriteCount() { + return writeCount.get(); + } + + /** + * Increase the number of writes into the container by 1. + */ + public void incrWriteCount() { + this.writeCount.incrementAndGet(); + } + + /** + * Get the number of bytes used by the container. + * @return the number of bytes used by the container. + */ + public long getBytesUsed() { + return containerData.getBytesUsed(); + } + + /** + * Increase the number of bytes used by the container. + * @param used number of bytes used by the container. + * @return the current number of bytes used by the container afert increase. + */ + public long incrBytesUsed(long used) { + return containerData.addBytesUsed(used); + } + + /** + * Set the number of bytes used by the container. + * @param used the number of bytes used by the container. + */ + public void setBytesUsed(long used) { + containerData.setBytesUsed(used); + } + + /** + * Decrease the number of bytes used by the container. + * @param reclaimed the number of bytes reclaimed from the container. + * @return the current number of bytes used by the container after decrease. + */ + public long decrBytesUsed(long reclaimed) { + return this.containerData.addBytesUsed(-1L * reclaimed); + } + + /** + * Get the maximum container size. + * @return the maximum container size. + */ + public long getMaxSize() { + return containerData.getMaxSize(); + } + + /** + * Set the maximum container size. + * @param size the maximum container size. + */ + public void setMaxSize(long size) { + this.containerData.setMaxSize(size); + } + + /** + * Get the number of keys in the container. + * @return the number of keys in the container. + */ + public long getNumKeys() { + return containerData.getKeyCount(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index bee537d1f8..205bfb0d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -320,7 +320,7 @@ private ContainerCommandResponseProto handleUpdateContainer( .getContainerData().getName(); ContainerData data = ContainerData.getFromProtBuf( - msg.getUpdateContainer().getContainerData()); + msg.getUpdateContainer().getContainerData(), conf); boolean forceUpdate = msg.getUpdateContainer().getForceUpdate(); this.containerManager.updateContainer( pipeline, containerName, data, forceUpdate); @@ -389,7 +389,7 @@ private ContainerCommandResponseProto handleCreateContainer( return ContainerUtils.malformedRequest(msg); } ContainerData cData = ContainerData.getFromProtBuf( - msg.getCreateContainer().getContainerData()); + msg.getCreateContainer().getContainerData(), conf); Preconditions.checkNotNull(cData, "Container data is null"); Pipeline pipeline = Pipeline.getFromProtoBuf( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java index 98661b869d..eb18486213 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java @@ -78,10 +78,10 @@ public void putKey(Pipeline pipeline, KeyData data) throws IOException { // We are not locking the key manager since LevelDb serializes all actions // against a single DB. We rely on DB level locking to avoid conflicts. Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - Preconditions.checkNotNull(pipeline.getContainerName(), + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, "Container name cannot be null"); - ContainerData cData = containerManager.readContainer( - pipeline.getContainerName()); + ContainerData cData = containerManager.readContainer(containerName); MetadataStore db = KeyUtils.getDB(cData, conf); // This is a post condition that acts as a hint to the user. @@ -92,7 +92,6 @@ public void putKey(Pipeline pipeline, KeyData data) throws IOException { } finally { containerManager.readUnlock(); } - } /** @@ -135,10 +134,10 @@ public void deleteKey(Pipeline pipeline, String keyName) containerManager.readLock(); try { Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - Preconditions.checkNotNull(pipeline.getContainerName(), + String containerName = pipeline.getContainerName(); + Preconditions.checkNotNull(containerName, "Container name cannot be null"); - ContainerData cData = containerManager.readContainer(pipeline - .getContainerName()); + ContainerData cData = containerManager.readContainer(containerName); MetadataStore db = KeyUtils.getDB(cData, conf); // This is a post condition that acts as a hint to the user. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index e8eb125352..04afc3c921 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -21,8 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.util.RwLock; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; @@ -44,10 +47,11 @@ public interface ContainerManager extends RwLock { * * @param config - Configuration. * @param containerDirs - List of Metadata Container locations. + * @param datanodeID - Datanode ID * @throws StorageContainerException */ - void init(Configuration config, List containerDirs) - throws IOException; + void init(Configuration config, List containerDirs, + DatanodeID datanodeID) throws IOException; /** * Creates a container with the given name. @@ -173,6 +177,13 @@ void closeContainer(String containerName) */ SCMNodeReport getNodeReport() throws IOException; + /** + * Gets container report. + * @return container report. + * @throws IOException + */ + ContainerReportsRequestProto getContainerReport() throws IOException; + /** * Gets container reports. * @return List of all closed containers. @@ -199,4 +210,67 @@ void closeContainer(String containerName) * container id */ void decrPendingDeletionBlocks(int numBlocks, String containerId); + + /** + * Increase the read count of the container. + * @param containerName - Name of the container. + */ + void incrReadCount(String containerName); + + /** + * Increse the read counter for bytes read from the container. + * @param containerName - Name of the container. + * @param readBytes - bytes read from the container. + */ + void incrReadBytes(String containerName, long readBytes); + + + /** + * Increase the write count of the container. + * @param containerName - Name of the container. + */ + void incrWriteCount(String containerName); + + /** + * Increase the write counter for bytes write into the container. + * @param containerName - Name of the container. + * @param writeBytes - bytes write into the container. + */ + void incrWriteBytes(String containerName, long writeBytes); + + /** + * Increase the bytes used by the container. + * @param containerName - Name of the container. + * @param used - additional bytes used by the container. + * @return the current bytes used. + */ + long incrBytesUsed(String containerName, long used); + + /** + * Decrease the bytes used by the container. + * @param containerName - Name of the container. + * @param used - additional bytes reclaimed by the container. + * @return the current bytes used. + */ + long decrBytesUsed(String containerName, long used); + + /** + * Get the bytes used by the container. + * @param containerName - Name of the container. + * @return the current bytes used by the container. + */ + long getBytesUsed(String containerName); + + /** + * Get the number of keys in the container. + * @param containerName - Name of the container. + * @return the current key count. + */ + long getNumKeys(String containerName); + + /** + * Get the container report state to send via HB to SCM. + * @return container report state. + */ + ReportState getContainerReportState(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java new file mode 100644 index 0000000000..7d2ba0ac04 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerReportManager.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.interfaces; + +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; + +/** + * Interface for container report manager operations. + */ +public interface ContainerReportManager { + + /** + * Get the container report state. + * @return the container report state. + */ + ReportState getContainerReportState(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 6e282c528f..349ed3f8cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -130,6 +130,7 @@ private void start() throws IOException { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB.set(Time.monotonicNow() + heartbeatFrequency); context.setReportState(container.getNodeReport()); + context.setContainerReportState(container.getContainerReportState()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java index a30783d6ce..57a1516718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java @@ -16,21 +16,18 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.List; /** * Container Report handler. @@ -62,22 +59,13 @@ public void handle(SCMCommand command, OzoneContainer container, invocationCount++; long startTime = Time.monotonicNow(); try { - ContainerReportsProto.Builder contianerReportsBuilder = - ContainerReportsProto.newBuilder(); - List closedContainerList = container.getContainerReports(); - for (ContainerData cd : closedContainerList) { - ContainerReport report = - new ContainerReport(cd.getContainerName(), cd.getHash()); - contianerReportsBuilder.addReports(report.getProtoBufMessage()); - } - contianerReportsBuilder.setType(ContainerReportsProto.reportType - .fullReport); + ContainerReportsRequestProto contianerReport = + container.getContainerReport(); // TODO : We send this report to all SCMs.Check if it is enough only to // send to the leader once we have RAFT enabled SCMs. for (EndpointStateMachine endPoint : connectionManager.getValues()) { - endPoint.getEndPoint().sendContainerReport( - contianerReportsBuilder.build()); + endPoint.getEndPoint().sendContainerReport(contianerReport); } } catch (IOException ex) { LOG.error("Unable to process the Container Report command.", ex); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 45f14d2bb4..e798b72504 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -36,8 +36,9 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +97,7 @@ public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws } manager = new ContainerManagerImpl(); - manager.init(this.ozoneConfig, locations); + manager.init(this.ozoneConfig, locations, datanodeID); this.chunkManager = new ChunkManagerImpl(manager); manager.setChunkManager(this.chunkManager); @@ -234,6 +235,16 @@ public int getRatisContainerServerPort() { return getPortbyType(OzoneProtos.ReplicationType.RATIS); } + /** + * Returns container report. + * @return - container report. + * @throws IOException + */ + public ContainerReportsRequestProto getContainerReport() throws IOException { + return this.manager.getContainerReport(); + } + +// TODO: remove getContainerReports /** * Returns the list of closed containers. * @return - List of closed containers. @@ -247,4 +258,12 @@ public List getContainerReports() throws IOException { public ContainerManager getContainerManager() { return this.manager; } + + /** + * Get the container report state to send via HB to SCM. + * @return the container report state. + */ + public ReportState getContainerReportState() { + return this.manager.getContainerReportState(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java index 922043ffd5..cb04668bfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; import org.apache.hadoop.utils.MetadataStore; @@ -459,10 +460,9 @@ private VolumeList getAllVolumes() throws IOException { public List getPendingDeletionKeys(final int count) throws IOException { List keyBlocksList = Lists.newArrayList(); - final MetadataKeyFilter deletingKeyFilter = - new KeyPrefixFilter(DELETING_KEY_PREFIX); List> rangeResult = - store.getSequentialRangeKVs(null, count, deletingKeyFilter); + store.getRangeKVs(null, count, + MetadataKeyFilters.getDeletingKeyFilter()); for (Map.Entry entry : rangeResult) { KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index a7b87174c7..ddcc261e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -18,7 +18,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; @@ -65,12 +66,12 @@ SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, /** * Send a container report. - * @param reports -- Container report - * @return HeartbeatRespose.nullcommand. + * @param reports -- Container report. + * @return container reports response. * @throws IOException */ - SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports) - throws IOException; + ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException; /** * Used by datanode to send block deletion ACK to SCM. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 9feec785f0..e7163a67be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; @@ -56,9 +57,10 @@ public interface StorageContainerNodeProtocol { * Send heartbeat to indicate the datanode is alive and doing well. * @param datanodeID - Datanode ID. * @param nodeReport - node report. + * @param reportState - container report. * @return SCMheartbeat response list */ List sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport); + SCMNodeReport nodeReport, ReportState reportState); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 033513dc9a..50d4fbb7cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -23,7 +23,8 @@ import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto @@ -168,9 +169,9 @@ public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, * @throws IOException */ @Override - public SCMHeartbeatResponseProto sendContainerReport( - ContainerReportsProto reports) throws IOException { - final SCMHeartbeatResponseProto resp; + public ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException { + final ContainerReportsResponseProto resp; try { resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports); } catch (ServiceException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 9808f3ef68..2fd7038676 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -21,9 +21,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -89,9 +90,8 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB( } @Override - public SCMHeartbeatResponseProto - sendContainerReport(RpcController controller, - ContainerReportsProto request) + public ContainerReportsResponseProto sendContainerReport( + RpcController controller, ContainerReportsRequestProto request) throws ServiceException { try { return impl.sendContainerReport(request); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 89102d5369..719d642a79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -43,7 +43,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; @@ -716,7 +717,7 @@ public SCMVersionResponseProto getVersion( public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport, ReportState reportState) throws IOException { List commands = - getScmNodeManager().sendHeartbeat(datanodeID, nodeReport); + getScmNodeManager().sendHeartbeat(datanodeID, nodeReport, reportState); List cmdResponses = new LinkedList<>(); for (SCMCommand cmd : commands) { cmdResponses.add(getCommandResponse(cmd)); @@ -749,13 +750,12 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, * @throws IOException */ @Override - public SCMHeartbeatResponseProto - sendContainerReport(ContainerReportsProto reports) throws IOException { - // TODO : fix this in the server side code changes for handling this request - // correctly. - List cmdResponses = new LinkedList<>(); - return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) - .build(); + public ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException { + + // TODO: handle the container reports either here or add container report + // handler. + return ContainerReportsResponseProto.newBuilder().build(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java index 75d712df17..f9900d3a14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -260,7 +260,8 @@ private void initPoolProcessThread() { * @param containerReport -- Container report for a specific container from * a datanode. */ - public void handleContainerReport(ContainerReportsProto containerReport) { + public void handleContainerReport( + ContainerReportsRequestProto containerReport) { String poolName = null; DatanodeID datanodeID = DatanodeID .getFromProtoBuf(containerReport.getDatanodeID()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java index d3dcc01d1f..e5cf1b342a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java @@ -22,7 +22,7 @@ import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; @@ -234,11 +234,13 @@ private NodeState getNodestate(DatanodeID id) { * * @param containerReport - ContainerReport */ - public void handleContainerReport(ContainerReportsProto containerReport) { + public void handleContainerReport( + ContainerReportsRequestProto containerReport) { executorService.submit(processContainerReport(containerReport)); } - private Runnable processContainerReport(ContainerReportsProto reports) { + private Runnable processContainerReport( + ContainerReportsRequestProto reports) { return () -> { DatanodeID datanodeID = DatanodeID.getFromProtoBuf(reports.getDatanodeID()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java index 894455819a..1e755b23e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; @@ -32,18 +33,21 @@ public class HeartbeatQueueItem { private DatanodeID datanodeID; private long recvTimestamp; private SCMNodeReport nodeReport; + private ReportState containerReportState; /** * * @param datanodeID - datanode ID of the heartbeat. * @param recvTimestamp - heartbeat receive timestamp. * @param nodeReport - node report associated with the heartbeat if any. + * @param containerReportState - container report state. */ HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp, - SCMNodeReport nodeReport) { + SCMNodeReport nodeReport, ReportState containerReportState) { this.datanodeID = datanodeID; this.recvTimestamp = recvTimestamp; this.nodeReport = nodeReport; + this.containerReportState = containerReportState; } /** @@ -60,6 +64,13 @@ public SCMNodeReport getNodeReport() { return nodeReport; } + /** + * @return container report state. + */ + public ReportState getContainerReportState() { + return containerReportState; + } + /** * @return heartbeat receive timestamp. */ @@ -73,6 +84,7 @@ public long getRecvTimestamp() { public static class Builder { private DatanodeID datanodeID; private SCMNodeReport nodeReport; + private ReportState containerReportState; private long recvTimestamp = monotonicNow(); public Builder setDatanodeID(DatanodeID datanodeId) { @@ -85,6 +97,11 @@ public Builder setNodeReport(SCMNodeReport scmNodeReport) { return this; } + public Builder setContainerReportState(ReportState crs) { + this.containerReportState = crs; + return this; + } + @VisibleForTesting public Builder setRecvTimestamp(long recvTime) { this.recvTimestamp = recvTime; @@ -92,7 +109,8 @@ public Builder setRecvTimestamp(long recvTime) { } public HeartbeatQueueItem build() { - return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport); + return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport, + containerReportState); } } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index ebe67c17b0..0573f9e313 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -31,6 +31,8 @@ import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto .ErrorCode; @@ -769,12 +771,13 @@ private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { * * @param datanodeID - Datanode ID. * @param nodeReport - node report. + * @param containerReportState - container report state. * @return SCMheartbeat response. * @throws IOException */ @Override public List sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) { + SCMNodeReport nodeReport, ReportState containerReportState) { // Checking for NULL to make sure that we don't get // an exception from ConcurrentList. @@ -785,6 +788,7 @@ public List sendHeartbeat(DatanodeID datanodeID, new HeartbeatQueueItem.Builder() .setDatanodeID(datanodeID) .setNodeReport(nodeReport) + .setContainerReportState(containerReportState) .build()); } else { LOG.error("Datanode ID in heartbeat is null"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java index f98d077604..3ff0a948a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java @@ -19,12 +19,30 @@ import com.google.common.base.Strings; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; /** * An utility class to filter levelDB keys. */ -public class MetadataKeyFilters { +public final class MetadataKeyFilters { + private static KeyPrefixFilter deletingKeyFilter = + new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX); + + private static KeyPrefixFilter normalKeyFilter = + new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX, + true); + + private MetadataKeyFilters() { + } + + public static KeyPrefixFilter getDeletingKeyFilter() { + return deletingKeyFilter; + } + + public static KeyPrefixFilter getNormalKeyFilter() { + return normalKeyFilter; + } /** * Interface for levelDB key filters. */ @@ -57,25 +75,34 @@ public static class KeyPrefixFilter implements MetadataKeyFilter { private String keyPrefix = null; private int keysScanned = 0; private int keysHinted = 0; + private Boolean negative; public KeyPrefixFilter(String keyPrefix) { + this(keyPrefix, false); + } + + public KeyPrefixFilter(String keyPrefix, boolean negative) { this.keyPrefix = keyPrefix; + this.negative = negative; } @Override public boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey) { keysScanned++; + boolean accept = false; if (Strings.isNullOrEmpty(keyPrefix)) { - return true; + accept = true; } else { if (currentKey != null && DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) { keysHinted++; - return true; + accept = true; + } else { + accept = false; } - return false; } + return (negative) ? !accept : accept; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index bb3d137c2d..88518e3fe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -96,7 +96,13 @@ message ContainerInfo { required string containerName = 1; required string finalhash = 2; optional int64 size = 3; - optional int64 keycount = 4; + optional int64 used = 4; + optional int64 keyCount = 5; + // TODO: move the io count to separate message + optional int64 readCount = 6; + optional int64 writeCount = 7; + optional int64 readBytes = 8; + optional int64 writeBytes = 9; } // The deleted blocks which are stored in deletedBlock.db of scm. @@ -112,7 +118,7 @@ message DeletedBlocksTransaction { A set of container reports, max count is generally set to 8192 since that keeps the size of the reports under 1 MB. */ -message ContainerReportsProto { +message ContainerReportsRequestProto { enum reportType { fullReport = 0; deltaReport = 1; @@ -122,6 +128,9 @@ message ContainerReportsProto { required reportType type = 3; } +message ContainerReportsResponseProto { +} + /** * This message is send along with the heart beat to report datanode * storage utilization by SCM. @@ -337,7 +346,7 @@ service StorageContainerDatanodeProtocolService { send container reports sends the container report to SCM. This will return a null command as response. */ - rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto); + rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto); /** * Sends the block deletion ACK to SCM. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 67f33f9c3e..268bc6ffbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -345,6 +345,14 @@ etc. This picks one of those for this cluster. + + ozone.container.report.interval.ms + 60000 + OZONE, CONTAINER, MANAGEMENT + Time interval in milliseconds of the datanode to send container + report. Each datanode periodically send container report upon receive + sendContainerReport from SCM. + ozone.administrators diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java index 3f7a4abe88..bbe24cf766 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java @@ -19,7 +19,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; @@ -58,9 +58,9 @@ public ReplicationDatanodeStateManager(NodeManager nodeManager, * @param dataNodeCount - Datanode Count. * @return List of Container Reports. */ - public List getContainerReport(String containerName, - String poolName, int dataNodeCount) { - List containerList = new LinkedList<>(); + public List getContainerReport( + String containerName, String poolName, int dataNodeCount) { + List containerList = new LinkedList<>(); List nodesInPool = poolManager.getNodes(poolName); if (nodesInPool == null) { @@ -81,10 +81,10 @@ public List getContainerReport(String containerName, .setContainerName(containerName) .setFinalhash(DigestUtils.sha256Hex(containerName)) .build(); - ContainerReportsProto containerReport = ContainerReportsProto - .newBuilder().addReports(info) + ContainerReportsRequestProto containerReport = + ContainerReportsRequestProto.newBuilder().addReports(info) .setDatanodeID(id.getProtoBufMessage()) - .setType(ContainerReportsProto.reportType.fullReport) + .setType(ContainerReportsRequestProto.reportType.fullReport) .build(); containerList.add(containerReport); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java index 93cbffa831..94f2a17fbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto @@ -276,11 +278,12 @@ public SCMCommand register(DatanodeID datanodeID) { * * @param datanodeID - Datanode ID. * @param nodeReport - node report. + * @param containerReportState - container report state. * @return SCMheartbeat response list */ @Override public List sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) { + SCMNodeReport nodeReport, ReportState containerReportState) { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index edef2b478c..dcaf68440a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; @@ -30,8 +31,11 @@ import org.apache.hadoop.ozone.scm.VersionInfo; import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -44,8 +48,10 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { private AtomicInteger rpcCount = new AtomicInteger(0); private ReportState reportState; private AtomicInteger containerReportsCount = new AtomicInteger(0); - private AtomicInteger closedContainerCount = new AtomicInteger(0); + // Map of datanode to containers + private Map> nodeContainers = + new HashMap(); /** * Returns the number of heartbeats made to this class. * @@ -91,11 +97,37 @@ public int getContainerReportsCount() { } /** - * Returns the number of closed containers that have been reported so far. - * @return - count of closed containers. + * Returns the number of containers that have been reported so far. + * @return - count of reported containers. */ - public int getClosedContainerCount() { - return closedContainerCount.get(); + public long getContainerCount() { + return nodeContainers.values().parallelStream().mapToLong((containerMap)->{ + return containerMap.size(); + }).sum(); + } + + /** + * Get the number keys reported from container reports. + * @return - number of keys reported. + */ + public long getKeyCount() { + return nodeContainers.values().parallelStream().mapToLong((containerMap)->{ + return containerMap.values().parallelStream().mapToLong((container) -> { + return container.getKeyCount(); + }).sum(); + }).sum(); + } + + /** + * Get the number of bytes used from container reports. + * @return - number of bytes used. + */ + public long getBytesUsed() { + return nodeContainers.values().parallelStream().mapToLong((containerMap)->{ + return containerMap.values().parallelStream().mapToLong((container) -> { + return container.getUsed(); + }).sum(); + }).sum(); } /** @@ -178,16 +210,28 @@ private void sleepIfNeeded() { * @throws IOException */ @Override - public SCMHeartbeatResponseProto + public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto sendContainerReport(StorageContainerDatanodeProtocolProtos - .ContainerReportsProto reports) throws IOException { + .ContainerReportsRequestProto reports) throws IOException { Preconditions.checkNotNull(reports); containerReportsCount.incrementAndGet(); - closedContainerCount.addAndGet(reports.getReportsCount()); - List - cmdResponses = new LinkedList<>(); - return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) - .build(); + + DatanodeID datanode = DatanodeID.getFromProtoBuf(reports.getDatanodeID()); + if (reports.getReportsCount() > 0) { + Map containers = nodeContainers.get(datanode); + if (containers == null) { + containers = new LinkedHashMap(); + nodeContainers.put(datanode, containers); + } + + for (StorageContainerDatanodeProtocolProtos.ContainerInfo report: + reports.getReportsList()) { + containers.put(report.getContainerName(), report); + } + } + + return StorageContainerDatanodeProtocolProtos + .ContainerReportsResponseProto.newBuilder().build(); } @Override @@ -200,4 +244,18 @@ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( public ReportState getReportState() { return this.reportState; } + + /** + * Reset the mock Scm for test to get a fresh start without rebuild MockScm. + */ + public void reset() { + heartbeatCount.set(0); + rpcCount.set(0); + reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + containerReportsCount.set(0); + nodeContainers.clear(); + + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index 775c052c64..a5fd99060d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -112,7 +113,7 @@ private ContainerManager createContainerManager(Configuration conf) ContainerManager containerManager = new ContainerManagerImpl(); List pathLists = new LinkedList<>(); pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath())); - containerManager.init(conf, pathLists); + containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID()); return containerManager; } @@ -126,7 +127,7 @@ private void createToDeleteBlocks(ContainerManager mgr, int numOfChunksPerBlock, File chunkDir) throws IOException { for (int x = 0; x < numOfContainers; x++) { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); mgr.createContainer(createSingleNodePipeline(containerName), data); data = mgr.readContainer(containerName); MetadataStore metadata = KeyUtils.getDB(data, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index d976aad4bb..1f3ec827e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -98,10 +98,14 @@ public void setUp() throws Exception { .getTempPath(TestDatanodeStateMachine.class.getSimpleName()); testRoot = new File(path); if (!testRoot.mkdirs()) { - LOG.info("Required directories already exist."); + LOG.info("Required directories {} already exist.", testRoot); + } + + File dataDir = new File(testRoot, "data"); + conf.set(DFS_DATANODE_DATA_DIR_KEY, dataDir.getAbsolutePath()); + if (!dataDir.mkdirs()) { + LOG.info("Data dir create failed."); } - conf.set(DFS_DATANODE_DATA_DIR_KEY, - new File(testRoot, "data").getAbsolutePath()); conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, new File(testRoot, "scm").getAbsolutePath()); path = Paths.get(path.toString(), @@ -334,6 +338,7 @@ public void testDatanodeStateMachineWithInvalidConfiguration() confList.forEach((entry) -> { Configuration perTestConf = new Configuration(conf); perTestConf.setStrings(entry.getKey(), entry.getValue()); + LOG.info("Test with {} = {}", entry.getKey(), entry.getValue()); try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( DFSTestUtil.getLocalDatanodeID(), perTestConf)) { DatanodeStateMachine.DatanodeStates currentState = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 3310801730..820d205971 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; import org.apache.hadoop.ozone.container.common.statemachine .DatanodeStateMachine; @@ -39,6 +40,10 @@ .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto @@ -97,8 +102,9 @@ public static void setUp() throws Exception { scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), scmServerImpl, serverAddress, 10); testDir = PathUtils.getTestDir(TestEndPoint.class); - defaultReportState = StorageContainerDatanodeProtocolProtos.ReportState. - newBuilder().setState(noContainerReports).setCount(0).build(); + defaultReportState = StorageContainerDatanodeProtocolProtos. + ReportState.newBuilder().setState(noContainerReports). + setCount(0).build(); } @Test @@ -368,11 +374,11 @@ ContainerReport getRandomContainerReport() { * @param count - The number of closed containers to create. * @return ContainerReportsProto */ - StorageContainerDatanodeProtocolProtos.ContainerReportsProto + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto createDummyContainerReports(int count) { - StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsProto.newBuilder(); + .ContainerReportsRequestProto.newBuilder(); for (int x = 0; x < count; x++) { reportsBuilder.addReports(getRandomContainerReport() .getProtoBufMessage()); @@ -380,7 +386,7 @@ ContainerReport getRandomContainerReport() { reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID() .getProtoBufMessage()); reportsBuilder.setType(StorageContainerDatanodeProtocolProtos - .ContainerReportsProto.reportType.fullReport); + .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); } @@ -391,15 +397,63 @@ ContainerReport getRandomContainerReport() { @Test public void testContainerReportSend() throws Exception { final int count = 1000; + scmServerImpl.reset(); try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), serverAddress, 1000)) { - SCMHeartbeatResponseProto responseProto = rpcEndPoint + ContainerReportsResponseProto responseProto = rpcEndPoint .getEndPoint().sendContainerReport(createDummyContainerReports( count)); Assert.assertNotNull(responseProto); } Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); - Assert.assertEquals(count, scmServerImpl.getClosedContainerCount()); + Assert.assertEquals(count, scmServerImpl.getContainerCount()); + } + + + /** + * Tests that rpcEndpoint sendContainerReport works as expected. + * @throws Exception + */ + @Test + public void testContainerReport() throws Exception { + final int count = 1000; + scmServerImpl.reset(); + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + ContainerReportsResponseProto responseProto = rpcEndPoint + .getEndPoint().sendContainerReport(createContainerReport(count)); + Assert.assertNotNull(responseProto); + } + Assert.assertEquals(1, scmServerImpl.getContainerReportsCount()); + Assert.assertEquals(count, scmServerImpl.getContainerCount()); + final long expectedKeyCount = count * 1000; + Assert.assertEquals(expectedKeyCount, scmServerImpl.getKeyCount()); + final long expectedBytesUsed = count * OzoneConsts.GB * 2; + Assert.assertEquals(expectedBytesUsed, scmServerImpl.getBytesUsed()); + } + + private ContainerReportsRequestProto createContainerReport(int count) { + StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder + reportsBuilder = StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.newBuilder(); + for (int x = 0; x < count; x++) { + ContainerReport report = new ContainerReport(UUID.randomUUID().toString(), + DigestUtils.sha256Hex("Simulated")); + report.setKeyCount(1000); + report.setSize(OzoneConsts.GB * 5); + report.setBytesUsed(OzoneConsts.GB * 2); + report.setReadCount(100); + report.setReadBytes(OzoneConsts.GB * 1); + report.setWriteCount(50); + report.setWriteBytes(OzoneConsts.GB * 2); + reportsBuilder.addReports(report.getProtoBufMessage()); + } + reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID() + .getProtoBufMessage()); + reportsBuilder.setType(StorageContainerDatanodeProtocolProtos + .ContainerReportsRequestProto.reportType.fullReport); + return reportsBuilder.build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java index c331e0e0ec..b2a61abd11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java @@ -28,7 +28,9 @@ import java.util.Random; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.conf.OzoneConfiguration; @@ -87,12 +89,12 @@ public void testRandomChoosingPolicy() throws IOException { List pathLists = new LinkedList<>(); pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath())); containerManager = new ContainerManagerImpl(); - containerManager.init(conf, pathLists); + containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID()); int numContainers = 10; for (int i = 0; i < numContainers; i++) { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); containerManager.createContainer(createSingleNodePipeline(containerName), data); Assert.assertTrue( @@ -133,7 +135,8 @@ public void testTopNOrderedChoosingPolicy() throws IOException { List pathLists = new LinkedList<>(); pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath())); containerManager = new ContainerManagerImpl(); - containerManager.init(conf, pathLists); + DatanodeID datanodeID = DFSTestUtil.getLocalDatanodeID(); + containerManager.init(conf, pathLists, datanodeID); int numContainers = 10; Random random = new Random(); @@ -141,7 +144,7 @@ public void testTopNOrderedChoosingPolicy() throws IOException { // create [numContainers + 1] containers for (int i = 0; i <= numContainers; i++) { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); containerManager.createContainer(createSingleNodePipeline(containerName), data); Assert.assertTrue( @@ -169,7 +172,7 @@ public void testTopNOrderedChoosingPolicy() throws IOException { containerManager.writeLock(); containerManager.shutdown(); containerManager.writeUnlock(); - containerManager.init(conf, pathLists); + containerManager.init(conf, pathLists, datanodeID); List result0 = containerManager .chooseContainerForBlockDeletion(5); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 972752f2b6..5cfd3e5864 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -143,11 +144,25 @@ public void setupPaths() throws IOException { loc.getNormalizedUri()); } pathLists.add(loc); - containerManager.init(conf, pathLists); + + for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { + StorageLocation location = StorageLocation.parse(dir); + FileUtils.forceMkdir(new File(location.getNormalizedUri())); + } + + containerManager.init(conf, pathLists, DFSTestUtil.getLocalDatanodeID()); } @After public void cleanupDir() throws IOException { + // Shutdown containerManager + containerManager.writeLock(); + try { + containerManager.shutdown(); + } finally { + containerManager.writeUnlock(); + } + // Clean up SCM metadata log.info("Deletting {}", path); FileUtils.deleteDirectory(new File(path)); @@ -163,7 +178,7 @@ public void cleanupDir() throws IOException { public void testCreateContainer() throws Exception { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName), @@ -199,7 +214,7 @@ public void testCreateContainer() throws Exception { public void testCreateDuplicateContainer() throws Exception { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName), @@ -219,14 +234,14 @@ public void testDeleteContainer() throws Exception { String containerName2 = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName1); + ContainerData data = new ContainerData(containerName1, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName1), data); containerManager.closeContainer(containerName1); - data = new ContainerData(containerName2); + data = new ContainerData(containerName2, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName2), @@ -246,7 +261,7 @@ public void testDeleteContainer() throws Exception { // Let us make sure that we are able to re-use a container name after // delete. - data = new ContainerData(containerName1); + data = new ContainerData(containerName1, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName1), @@ -284,7 +299,7 @@ public void testGetContainerReports() throws Exception{ for (int i = 0; i < count; i++) { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); containerManager.createContainer(createSingleNodePipeline(containerName), data); @@ -321,7 +336,7 @@ public void testListContainer() throws IOException { for (int x = 0; x < count; x++) { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); containerManager.createContainer(createSingleNodePipeline(containerName), @@ -355,7 +370,7 @@ private ChunkInfo writeChunkHelper(String containerName, String keyName, NoSuchAlgorithmException { final int datalen = 1024; pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner", "bilbo"); if(!containerManager.getContainerMap() @@ -404,7 +419,7 @@ public void testWritReadManyChunks() throws IOException, Map fileHashMap = new HashMap<>(); pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); containerManager.createContainer(pipeline, cData); @@ -468,7 +483,7 @@ public void testPartialRead() throws Exception { Pipeline pipeline = createSingleNodePipeline(containerName); pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); containerManager.createContainer(pipeline, cData); @@ -503,7 +518,7 @@ public void testOverWrite() throws IOException, Pipeline pipeline = createSingleNodePipeline(containerName); pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); containerManager.createContainer(pipeline, cData); @@ -521,6 +536,11 @@ public void testOverWrite() throws IOException, // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); chunkManager.writeChunk(pipeline, keyName, info, data); + long bytesUsed = containerManager.getBytesUsed(containerName); + Assert.assertEquals(datalen, bytesUsed); + + long bytesWrite = containerManager.getWriteBytes(containerName); + Assert.assertEquals(datalen * 2, bytesWrite); } /** @@ -541,7 +561,7 @@ public void testMultipleWriteSingleRead() throws IOException, Pipeline pipeline = createSingleNodePipeline(containerName); pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); containerManager.createContainer(pipeline, cData); @@ -580,7 +600,7 @@ public void testDeleteChunk() throws IOException, Pipeline pipeline = createSingleNodePipeline(containerName); pipeline.setContainerName(containerName); - ContainerData cData = new ContainerData(containerName); + ContainerData cData = new ContainerData(containerName, conf); cData.addMetadata("VOLUME", "shire"); cData.addMetadata("owner)", "bilbo"); containerManager.createContainer(pipeline, cData); @@ -626,22 +646,35 @@ public void testPutKey() throws IOException, NoSuchAlgorithmException { @Test public void testPutKeyWithLotsOfChunks() throws IOException, NoSuchAlgorithmException { - final int chunkCount = 1024; + final int chunkCount = 2; final int datalen = 1024; + long totalSize = 0L; String containerName = OzoneUtils.getRequestID(); String keyName = OzoneUtils.getRequestID(); Pipeline pipeline = createSingleNodePipeline(containerName); List chunkList = new LinkedList<>(); ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline); + totalSize += datalen; chunkList.add(info); for (int x = 1; x < chunkCount; x++) { + // with holes in the front (before x * datalen) info = getChunk(keyName, x, x * datalen, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(pipeline, keyName, info, data); + totalSize += datalen * (x + 1); chunkList.add(info); } + long bytesUsed = containerManager.getBytesUsed(containerName); + Assert.assertEquals(totalSize, bytesUsed); + long writeBytes = containerManager.getWriteBytes(containerName); + Assert.assertEquals(chunkCount * datalen, writeBytes); + long readCount = containerManager.getReadCount(containerName); + Assert.assertEquals(0, readCount); + long writeCount = containerManager.getWriteCount(containerName); + Assert.assertEquals(chunkCount, writeCount); + KeyData keyData = new KeyData(containerName, keyName); List chunkProtoList = new LinkedList<>(); for (ChunkInfo i : chunkList) { @@ -713,7 +746,7 @@ public void testDeleteKeyTwice() throws IOException, @Test public void testUpdateContainer() throws IOException { String containerName = OzoneUtils.getRequestID(); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner", "bilbo"); @@ -724,7 +757,7 @@ public void testUpdateContainer() throws IOException { File orgContainerFile = containerManager.getContainerFile(data); Assert.assertTrue(orgContainerFile.exists()); - ContainerData newData = new ContainerData(containerName); + ContainerData newData = new ContainerData(containerName, conf); newData.addMetadata("VOLUME", "shire_new"); newData.addMetadata("owner", "bilbo_new"); @@ -757,7 +790,7 @@ public void testUpdateContainer() throws IOException { ContainerProtos.ContainerData actualContainerDataProto = ContainerProtos.ContainerData.parseDelimitedFrom(newIn); ContainerData actualContainerData = ContainerData - .getFromProtBuf(actualContainerDataProto); + .getFromProtBuf(actualContainerDataProto, conf); Assert.assertEquals(actualContainerData.getAllMetadata().get("VOLUME"), "shire_new"); Assert.assertEquals(actualContainerData.getAllMetadata().get("owner"), @@ -776,7 +809,7 @@ public void testUpdateContainer() throws IOException { } // Update with force flag, it should be success. - newData = new ContainerData(containerName); + newData = new ContainerData(containerName, conf); newData.addMetadata("VOLUME", "shire_new_1"); newData.addMetadata("owner", "bilbo_new_1"); containerManager.updateContainer(createSingleNodePipeline(containerName), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java index d804c46d39..8394a2cd97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java @@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.container.replication .ContainerReplicationManager; import org.apache.hadoop.ozone.scm.container.replication.InProgressPool; @@ -145,7 +145,7 @@ public void testDetectSingleContainerReplica() throws TimeoutException, String threeNodeContainer = "ThreeNodeContainer"; InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); // Only single datanode reporting that "SingleNodeContainer" exists. - List clist = + List clist = datanodeStateManager.getContainerReport(singleNodeContainer, ppool.getPool().getPoolName(), 1); ppool.handleContainerReport(clist.get(0)); @@ -154,7 +154,7 @@ public void testDetectSingleContainerReplica() throws TimeoutException, clist = datanodeStateManager.getContainerReport(threeNodeContainer, ppool.getPool().getPoolName(), 3); - for (ContainerReportsProto reportsProto : clist) { + for (ContainerReportsRequestProto reportsProto : clist) { ppool.handleContainerReport(reportsProto); } GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4, @@ -181,7 +181,7 @@ public void testDetectOverReplica() throws TimeoutException, String wayOverReplicated = "WayOverReplicated"; InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); - List clist = + List clist = datanodeStateManager.getContainerReport(normalContainer, ppool.getPool().getPoolName(), 3); ppool.handleContainerReport(clist.get(0)); @@ -189,14 +189,14 @@ public void testDetectOverReplica() throws TimeoutException, clist = datanodeStateManager.getContainerReport(overReplicated, ppool.getPool().getPoolName(), 4); - for (ContainerReportsProto reportsProto : clist) { + for (ContainerReportsRequestProto reportsProto : clist) { ppool.handleContainerReport(reportsProto); } clist = datanodeStateManager.getContainerReport(wayOverReplicated, ppool.getPool().getPoolName(), 7); - for (ContainerReportsProto reportsProto : clist) { + for (ContainerReportsRequestProto reportsProto : clist) { ppool.handleContainerReport(reportsProto); } @@ -249,7 +249,7 @@ public void testAddingNewPoolWorks() // Assert that we are able to send a container report to this new // pool and datanode. - List clist = + List clist = datanodeStateManager.getContainerReport("NewContainer1", "PoolNew", 1); replicationManager.handleContainerReport(clist.get(0)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java index 50b3f57580..7b40f0fe04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMCli.java @@ -166,7 +166,7 @@ public void testDeleteContainer() throws Exception { pipeline = scm.allocateContainer(xceiverClientManager.getType(), OzoneProtos.ReplicationFactor.ONE, containerName); - containerData = new ContainerData(containerName); + containerData = new ContainerData(containerName, conf); containerManager.createContainer(pipeline, containerData); ContainerData cdata = containerManager.readContainer(containerName); KeyUtils.getDB(cdata, conf).put(containerName.getBytes(), @@ -207,7 +207,7 @@ public void testDeleteContainer() throws Exception { containerName = "empty-container"; pipeline = scm.allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerName); - containerData = new ContainerData(containerName); + containerData = new ContainerData(containerName, conf); containerManager.createContainer(pipeline, containerData); containerManager.closeContainer(containerName); Assert.assertTrue(containerExist(containerName)); @@ -271,7 +271,7 @@ public void testInfoContainer() throws Exception { cname = "ContainerTestInfo1"; Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), cname); - ContainerData data = new ContainerData(cname); + ContainerData data = new ContainerData(cname, conf); containerManager.createContainer(pipeline, data); info = new String[]{"-container", "-info", "-c", cname}; @@ -292,7 +292,7 @@ public void testInfoContainer() throws Exception { cname = "ContainerTestInfo2"; pipeline = scm.allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), cname); - data = new ContainerData(cname); + data = new ContainerData(cname, conf); containerManager.createContainer(pipeline, data); KeyUtils.getDB(data, conf).put(cname.getBytes(), "someKey".getBytes()); @@ -313,7 +313,7 @@ public void testInfoContainer() throws Exception { cname = "ContainerTestInfo3"; pipeline = scm.allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), cname); - data = new ContainerData(cname); + data = new ContainerData(cname, conf); data.addMetadata("VOLUME", "shire"); data.addMetadata("owner", "bilbo"); containerManager.createContainer(pipeline, data); @@ -378,7 +378,7 @@ public void testListContainerCommand() throws Exception { String containerName = String.format("%s%02d", prefix, index); Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerName); - ContainerData data = new ContainerData(containerName); + ContainerData data = new ContainerData(containerName, conf); containerManager.createContainer(pipeline, data); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index cb532fc071..270929eabd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -24,7 +24,12 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; + import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; @@ -324,8 +329,7 @@ public void run() { * datanode. */ @Override - public VersionResponse getVersion(StorageContainerDatanodeProtocolProtos - .SCMVersionRequestProto versionRequest) { + public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { return null; } @@ -347,11 +351,12 @@ public SCMCommand register(DatanodeID datanodeID) { * * @param datanodeID - Datanode ID. * @param nodeReport - node report. + * @param containerReportState - container report state. * @return SCMheartbeat response list */ @Override public List sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport) { + SCMNodeReport nodeReport, ReportState containerReportState) { if ((datanodeID != null) && (nodeReport != null) && (nodeReport .getStorageReportCount() > 0)) { SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); @@ -359,10 +364,8 @@ public List sendHeartbeat(DatanodeID datanodeID, long totalCapacity = 0L; long totalRemaining = 0L; long totalScmUsed = 0L; - List - storageReports = nodeReport.getStorageReportList(); - for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : - storageReports) { + List storageReports = nodeReport.getStorageReportList(); + for (SCMStorageReport report : storageReports) { totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed += report.getScmUsed(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index e0b2b5b247..42a56276f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -28,7 +28,11 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.scm.container.ContainerMapping; import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; @@ -64,6 +68,11 @@ public class TestContainerPlacement { public ExpectedException thrown = ExpectedException.none(); private static XceiverClientManager xceiverClientManager = new XceiverClientManager(new OzoneConfiguration()); + + private ReportState reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + /** * Returns a new copy of Configuration. * @@ -128,16 +137,13 @@ public void testContainerPlacementCapacity() throws IOException, SCMTestUtils.getRegisteredDatanodeIDs(nodeManager, nodeCount); try { for (DatanodeID datanodeID : datanodes) { - StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = - StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder(); - StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb = - StorageContainerDatanodeProtocolProtos.SCMStorageReport - .newBuilder(); + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); srb.setStorageUuid(UUID.randomUUID().toString()); srb.setCapacity(capacity).setScmUsed(used). setRemaining(remaining).build(); nodeManager.sendHeartbeat(datanodeID, - nrb.addStorageReport(srb).build()); + nrb.addStorageReport(srb).build(), reportState); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -164,16 +170,13 @@ public void testContainerPlacementCapacity() throws IOException, final long newRemaining = capacity - newUsed; for (DatanodeID datanodeID : datanodes) { - StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = - StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder(); - StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb = - StorageContainerDatanodeProtocolProtos.SCMStorageReport - .newBuilder(); + SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); srb.setStorageUuid(UUID.randomUUID().toString()); srb.setCapacity(capacity).setScmUsed(newUsed). setRemaining(newRemaining).build(); nodeManager.sendHeartbeat(datanodeID, - nrb.addStorageReport(srb).build()); + nrb.addStorageReport(srb).build(), reportState); } GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 18279f012e..f81e6e6a29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -25,6 +25,8 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto @@ -77,6 +79,10 @@ public class TestNodeManager { private File testDir; + private ReportState reportState = ReportState.newBuilder() + .setState(ReportState.states.noContainerReports) + .setCount(0).build(); + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -141,7 +147,7 @@ public void testScmHeartbeat() throws IOException, // Send some heartbeats from different nodes. for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID, null); + nodeManager.sendHeartbeat(datanodeID, null, reportState); } // Wait for 4 seconds max. @@ -187,7 +193,8 @@ public void testScmNotEnoughHeartbeats() throws IOException, // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null); + nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), + null, reportState); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have" + @@ -213,7 +220,7 @@ public void testScmSameNodeHeartbeats() throws IOException, // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { - nodeManager.sendHeartbeat(datanodeID, null); + nodeManager.sendHeartbeat(datanodeID, null, reportState); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), @@ -242,7 +249,7 @@ public void testScmShutdown() throws IOException, InterruptedException, nodeManager.close(); // These should never be processed. - nodeManager.sendHeartbeat(datanodeID, null); + nodeManager.sendHeartbeat(datanodeID, null, reportState); // Let us just wait for 2 seconds to prove that HBs are not processed. Thread.sleep(2 * 1000); @@ -264,7 +271,8 @@ public void testScmHeartbeatAfterRestart() throws Exception { DatanodeID datanodeID = SCMTestUtils.getDatanodeID(); try (SCMNodeManager nodemanager = createNodeManager(conf)) { nodemanager.register(datanodeID); - List command = nodemanager.sendHeartbeat(datanodeID, null); + List command = nodemanager.sendHeartbeat(datanodeID, + null, reportState); Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID)); Assert.assertTrue("On regular HB calls, SCM responses a " + "datanode with an empty command list", command.isEmpty()); @@ -282,7 +290,8 @@ public void testScmHeartbeatAfterRestart() throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { List command = - nodemanager.sendHeartbeat(datanodeID, null); + nodemanager.sendHeartbeat(datanodeID, null, + reportState); return command.size() == 1 && command.get(0).getType() .equals(Type.reregisterCommand); } @@ -312,7 +321,7 @@ public void testScmHealthyNodeCount() throws IOException, for (int x = 0; x < count; x++) { DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID, null); + nodeManager.sendHeartbeat(datanodeID, null, reportState); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); @@ -400,18 +409,18 @@ public void testScmDetectStaleAndDeadNode() throws IOException, DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once - nodeManager.sendHeartbeat(staleNode, null); + nodeManager.sendHeartbeat(staleNode, null, reportState); // Heartbeat all other nodes. for (DatanodeID dn : nodeList) { - nodeManager.sendHeartbeat(dn, null); + nodeManager.sendHeartbeat(dn, null, reportState); } // Wait for 2 seconds .. and heartbeat good nodes again. Thread.sleep(2 * 1000); for (DatanodeID dn : nodeList) { - nodeManager.sendHeartbeat(dn, null); + nodeManager.sendHeartbeat(dn, null, reportState); } // Wait for 2 seconds, wait a total of 4 seconds to make sure that the @@ -428,7 +437,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException, // heartbeat good nodes again. for (DatanodeID dn : nodeList) { - nodeManager.sendHeartbeat(dn, null); + nodeManager.sendHeartbeat(dn, null, reportState); } // 6 seconds is the dead window for this test , so we wait a total of @@ -466,7 +475,7 @@ public void testScmLogErrorOnNullDatanode() throws IOException, try (SCMNodeManager nodeManager = createNodeManager(getConf())) { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); - nodeManager.sendHeartbeat(null, null); + nodeManager.sendHeartbeat(null, null, reportState); logCapturer.stopCapturing(); assertThat(logCapturer.getOutput(), containsString("Datanode ID in heartbeat is null")); @@ -542,9 +551,9 @@ public void testScmClusterIsInExpectedState1() throws IOException, SCMTestUtils.getDatanodeID(nodeManager, "StaleNode"); DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager, "DeadNode"); - nodeManager.sendHeartbeat(healthyNode, null); - nodeManager.sendHeartbeat(staleNode, null); - nodeManager.sendHeartbeat(deadNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); + nodeManager.sendHeartbeat(staleNode, null, reportState); + nodeManager.sendHeartbeat(deadNode, null, reportState); // Sleep so that heartbeat processing thread gets to run. Thread.sleep(500); @@ -570,12 +579,12 @@ public void testScmClusterIsInExpectedState1() throws IOException, * the 3 second windows. */ - nodeManager.sendHeartbeat(healthyNode, null); - nodeManager.sendHeartbeat(staleNode, null); - nodeManager.sendHeartbeat(deadNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); + nodeManager.sendHeartbeat(staleNode, null, reportState); + nodeManager.sendHeartbeat(deadNode, null, reportState); Thread.sleep(1500); - nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); Thread.sleep(2 * 1000); assertEquals(1, nodeManager.getNodeCount(HEALTHY)); @@ -595,10 +604,10 @@ public void testScmClusterIsInExpectedState1() throws IOException, * staleNode to move to stale state and deadNode to move to dead state. */ - nodeManager.sendHeartbeat(healthyNode, null); - nodeManager.sendHeartbeat(staleNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); + nodeManager.sendHeartbeat(staleNode, null, reportState); Thread.sleep(1500); - nodeManager.sendHeartbeat(healthyNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); Thread.sleep(2 * 1000); // 3.5 seconds have elapsed for stale node, so it moves into Stale. @@ -631,9 +640,9 @@ public void testScmClusterIsInExpectedState1() throws IOException, * Cluster State : let us heartbeat all the nodes and verify that we get * back all the nodes in healthy state. */ - nodeManager.sendHeartbeat(healthyNode, null); - nodeManager.sendHeartbeat(staleNode, null); - nodeManager.sendHeartbeat(deadNode, null); + nodeManager.sendHeartbeat(healthyNode, null, reportState); + nodeManager.sendHeartbeat(staleNode, null, reportState); + nodeManager.sendHeartbeat(deadNode, null, reportState); Thread.sleep(500); //Assert all nodes are healthy. assertEquals(3, nodeManager.getAllNodes().size()); @@ -653,7 +662,7 @@ private void heartbeatNodeSet(SCMNodeManager manager, List list, int sleepDuration) throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { for (DatanodeID dn : list) { - manager.sendHeartbeat(dn, null); + manager.sendHeartbeat(dn, null, reportState); } Thread.sleep(sleepDuration); } @@ -739,7 +748,7 @@ public void testScmClusterIsInExpectedState2() throws IOException, // No Thread just one time HBs the node manager, so that these will be // marked as dead nodes eventually. for (DatanodeID dn : deadNodeList) { - nodeManager.sendHeartbeat(dn, null); + nodeManager.sendHeartbeat(dn, null, reportState); } @@ -893,7 +902,7 @@ public void testScmEnterAndExitChillMode() throws IOException, try (SCMNodeManager nodeManager = createNodeManager(conf)) { nodeManager.setMinimumChillModeNodes(10); DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanodeID, null); + nodeManager.sendHeartbeat(datanodeID, null, reportState); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, CoreMatchers.containsString("Still in chill " + "mode, waiting on nodes to report in.")); @@ -920,7 +929,7 @@ public void testScmEnterAndExitChillMode() throws IOException, // Assert that node manager force enter cannot be overridden by nodes HBs. for (int x = 0; x < 20; x++) { DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager); - nodeManager.sendHeartbeat(datanode, null); + nodeManager.sendHeartbeat(datanode, null, reportState); } Thread.sleep(500); @@ -963,7 +972,7 @@ public void testScmStatsFromNodeReport() throws IOException, srb.setCapacity(capacity).setScmUsed(used). setRemaining(capacity - used).build(); nodeManager.sendHeartbeat(datanodeID, - nrb.addStorageReport(srb).build()); + nrb.addStorageReport(srb).build(), reportState); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); @@ -1010,7 +1019,7 @@ public void testScmNodeReportUpdate() throws IOException, .setRemaining(capacity - x * usedPerHeartbeat).build(); nrb.addStorageReport(srb); - nodeManager.sendHeartbeat(datanodeID, nrb.build()); + nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState); Thread.sleep(100); } @@ -1092,7 +1101,7 @@ public void testScmNodeReportUpdate() throws IOException, srb.setCapacity(capacity).setScmUsed(expectedScmUsed) .setRemaining(expectedRemaining).build(); nrb.addStorageReport(srb); - nodeManager.sendHeartbeat(datanodeID, nrb.build()); + nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState); // Wait up to 5 seconds so that the dead node becomes healthy // Verify usage info should be updated.