diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 1c61feb4c9..73d81f811d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -190,6 +190,29 @@ public final class ScmConfigKeys {
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
+ /**
+ * Don't start processing a pool if we have not had a minimum number of
+ * seconds from the last processing.
+ */
+ public static final String
+ OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS =
+ "ozone.scm.container.report.processing.interval.seconds";
+ public static final int
+ OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = 60;
+
+ /**
+ * These 2 settings control the number of threads in executor pool and time
+ * outs for thw container reports from all nodes.
+ */
+ public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS =
+ "ozone.scm.max.container.report.threads";
+ public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100;
+ public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS =
+ "ozone.scm.container.reports.wait.timeout.seconds";
+ public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
+ 300; // Default 5 minute wait.
+
+
/**
* Never constructed.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
new file mode 100644
index 0000000000..71836dbcc1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+ .sleepUninterruptibly;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerReplicationManager implements Closeable {
+ static final Logger LOG =
+ LoggerFactory.getLogger(ContainerReplicationManager.class);
+
+ private final NodePoolManager poolManager;
+ private final CommandQueue commandQueue;
+ private final HashSet poolNames;
+ private final PriorityQueue poolQueue;
+ private final NodeManager nodeManager;
+ private final int containerProcessingLag;
+ private final AtomicBoolean runnable;
+ private final ExecutorService executorService;
+ private final int maxPoolWait;
+ private long poolProcessCount;
+ private final List inProgressPoolList;
+ private final AtomicInteger threadFaultCount;
+
+ /**
+ * Returns the number of times we have processed pools.
+ * @return long
+ */
+ public long getPoolProcessCount() {
+ return poolProcessCount;
+ }
+
+
+ /**
+ * Constructs a class that computes Replication Levels.
+ *
+ * @param conf - OzoneConfiguration
+ * @param nodeManager - Node Manager
+ * @param poolManager - Pool Manager
+ * @param commandQueue - Datanodes Command Queue.
+ */
+ public ContainerReplicationManager(OzoneConfiguration conf,
+ NodeManager nodeManager, NodePoolManager poolManager,
+ CommandQueue commandQueue) {
+ Preconditions.checkNotNull(poolManager);
+ Preconditions.checkNotNull(commandQueue);
+ Preconditions.checkNotNull(nodeManager);
+ this.containerProcessingLag =
+ conf.getInt(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS,
+ OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT
+
+ ) * 1000;
+ int maxContainerReportThreads =
+ conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+ OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+ );
+ this.maxPoolWait =
+ conf.getInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS,
+ OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT) * 1000;
+ this.poolManager = poolManager;
+ this.commandQueue = commandQueue;
+ this.nodeManager = nodeManager;
+ this.poolNames = new HashSet<>();
+ this.poolQueue = new PriorityQueue<>();
+ runnable = new AtomicBoolean(true);
+ this.threadFaultCount = new AtomicInteger(0);
+ executorService = HadoopExecutors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Container Reports Processing Thread - %d")
+ .build(), maxContainerReportThreads);
+ inProgressPoolList = new LinkedList<>();
+
+ initPoolProcessThread();
+ }
+
+ /**
+ * Returns the number of pools that are under process right now.
+ * @return int - Number of pools that are in process.
+ */
+ public int getInProgressPoolCount() {
+ return inProgressPoolList.size();
+ }
+
+ /**
+ * Exits the background thread.
+ */
+ public void setExit() {
+ this.runnable.set(false);
+ }
+
+ /**
+ * Adds or removes pools from names that we need to process.
+ *
+ * There are two different cases that we need to process.
+ * The case where some pools are being added and some times we have to
+ * handle cases where pools are removed.
+ */
+ private void refreshPools() {
+ List pools = this.poolManager.getNodePools();
+ if (pools != null) {
+
+ HashSet removedPools =
+ computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+ HashSet addedPools =
+ computePoolDifference(new HashSet<>(pools), this.poolNames);
+ // TODO: Support remove pool API in pool manager so that this code
+ // path can be tested. This never happens in the current code base.
+ for (String poolName : removedPools) {
+ for (PeriodicPool periodicPool : poolQueue) {
+ if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+ poolQueue.remove(periodicPool);
+ }
+ }
+ }
+ // Remove the pool names that we have in the list.
+ this.poolNames.removeAll(removedPools);
+
+ for (String poolName : addedPools) {
+ poolQueue.add(new PeriodicPool(poolName));
+ }
+
+ // Add to the pool names we are tracking.
+ poolNames.addAll(addedPools);
+ }
+
+ }
+
+ /**
+ * Handle the case where pools are added.
+ *
+ * @param newPools - New Pools list
+ * @param oldPool - oldPool List.
+ */
+ private HashSet computePoolDifference(HashSet newPools,
+ Set oldPool) {
+ Preconditions.checkNotNull(newPools);
+ Preconditions.checkNotNull(oldPool);
+ HashSet newSet = new HashSet<>(newPools);
+ newSet.removeAll(oldPool);
+ return newSet;
+ }
+
+ private void initPoolProcessThread() {
+
+ /*
+ * Task that runs to check if we need to start a pool processing job.
+ * if so we create a pool reconciliation job and find out of all the
+ * expected containers are on the nodes.
+ */
+ Runnable processPools = () -> {
+ while (runnable.get()) {
+ // Make sure that we don't have any new pools.
+ refreshPools();
+ PeriodicPool pool = poolQueue.poll();
+ if (pool != null) {
+ if (pool.getLastProcessedTime() + this.containerProcessingLag <
+ Time.monotonicNow()) {
+ LOG.debug("Adding pool {} to container processing queue", pool
+ .getPoolName());
+ InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+ pool, this.nodeManager, this.poolManager, this.commandQueue,
+ this.executorService);
+ inProgressPool.startReconciliation();
+ inProgressPoolList.add(inProgressPool);
+ poolProcessCount++;
+
+ } else {
+
+ LOG.debug("Not within the time window for processing: {}",
+ pool.getPoolName());
+ // Put back this pool since we are not planning to process it.
+ poolQueue.add(pool);
+ // we might over sleep here, not a big deal.
+ sleepUninterruptibly(this.containerProcessingLag,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+ }
+ };
+
+ // We will have only one thread for pool processing.
+ Thread poolProcessThread = new Thread(processPools);
+ poolProcessThread.setDaemon(true);
+ poolProcessThread.setName("Pool replica thread");
+ poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+ // Let us just restart this thread after logging a critical error.
+ // if this thread is not running we cannot handle commands from SCM.
+ LOG.error("Critical Error : Pool replica thread encountered an " +
+ "error. Thread: {} Error Count : {}", t.toString(), e,
+ threadFaultCount.incrementAndGet());
+ poolProcessThread.start();
+ // TODO : Add a config to restrict how many times we will restart this
+ // thread in a single session.
+ });
+ poolProcessThread.start();
+ }
+
+ /**
+ * Adds a container report to appropriate inProgress Pool.
+ * @param containerReport -- Container report for a specific container from
+ * a datanode.
+ */
+ public void handleContainerReport(ContainerReportsProto containerReport) {
+ String poolName = poolManager.getNodePool(
+ DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
+
+ for(InProgressPool ppool : inProgressPoolList) {
+ if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
+ ppool.handleContainerReport(containerReport);
+ return;
+ }
+ }
+ // TODO: Decide if we can do anything else with this report.
+ LOG.debug("Discarding the container report for pool {}. That pool is not " +
+ "currently in the pool reconciliation process. Container Name: {}",
+ poolName, containerReport.getDatanodeID());
+ }
+
+ /**
+ * Get in process pool list, used for testing.
+ * @return List of InProgressPool
+ */
+ @VisibleForTesting
+ public List getInProcessPoolList() {
+ return inProgressPoolList;
+ }
+
+ /**
+ * Shutdown the Container Replication Manager.
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ setExit();
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
new file mode 100644
index 0000000000..87629e1307
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+ .sleepUninterruptibly;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.UNKNOWN;
+
+/**
+ * These are pools that are actively checking for replication status of the
+ * containers.
+ */
+public final class InProgressPool {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(InProgressPool.class);
+ private final PeriodicPool pool;
+ private final CommandQueue commandQueue;
+ private final NodeManager nodeManager;
+ private final NodePoolManager poolManager;
+ private final ExecutorService executorService;
+ private final Map containerCountMap;
+ private final Map processedNodeSet;
+ private final long startTime;
+ private ProgressStatus status;
+ private AtomicInteger nodeCount;
+ private AtomicInteger nodeProcessed;
+ private AtomicInteger containerProcessedCount;
+ private int maxWaitTime;
+ /**
+ * Constructs an pool that is being processed.
+ *
+ * @param maxWaitTime - Maximum wait time in milliseconds.
+ * @param pool - Pool that we are working against
+ * @param nodeManager - Nodemanager
+ * @param poolManager - pool manager
+ * @param commandQueue - Command queue
+ * @param executorService - Shared Executor service.
+ */
+ InProgressPool(int maxWaitTime, PeriodicPool pool,
+ NodeManager nodeManager, NodePoolManager poolManager,
+ CommandQueue commandQueue, ExecutorService executorService) {
+ Preconditions.checkNotNull(pool);
+ Preconditions.checkNotNull(nodeManager);
+ Preconditions.checkNotNull(poolManager);
+ Preconditions.checkNotNull(commandQueue);
+ Preconditions.checkNotNull(executorService);
+ Preconditions.checkArgument(maxWaitTime > 0);
+ this.pool = pool;
+ this.nodeManager = nodeManager;
+ this.poolManager = poolManager;
+ this.commandQueue = commandQueue;
+ this.executorService = executorService;
+ this.containerCountMap = new ConcurrentHashMap<>();
+ this.processedNodeSet = new ConcurrentHashMap<>();
+ this.maxWaitTime = maxWaitTime;
+ startTime = Time.monotonicNow();
+ }
+
+ /**
+ * Returns periodic pool.
+ *
+ * @return PeriodicPool
+ */
+ public PeriodicPool getPool() {
+ return pool;
+ }
+
+ /**
+ * We are done if we have got reports from all nodes or we have
+ * done waiting for the specified time.
+ *
+ * @return true if we are done, false otherwise.
+ */
+ public boolean isDone() {
+ return (nodeCount.get() == nodeProcessed.get()) ||
+ (this.startTime + this.maxWaitTime) > Time.monotonicNow();
+ }
+
+ /**
+ * Gets the number of containers processed.
+ *
+ * @return int
+ */
+ public int getContainerProcessedCount() {
+ return containerProcessedCount.get();
+ }
+
+ /**
+ * Returns the start time in milliseconds.
+ *
+ * @return - Start Time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Get the number of nodes in this pool.
+ *
+ * @return - node count
+ */
+ public int getNodeCount() {
+ return nodeCount.get();
+ }
+
+ /**
+ * Get the number of nodes that we have already processed container reports
+ * from.
+ *
+ * @return - Processed count.
+ */
+ public int getNodeProcessed() {
+ return nodeProcessed.get();
+ }
+
+ /**
+ * Returns the current status.
+ *
+ * @return Status
+ */
+ public ProgressStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * Starts the reconciliation process for all the nodes in the pool.
+ */
+ public void startReconciliation() {
+ List datanodeIDList =
+ this.poolManager.getNodes(pool.getPoolName());
+ if (datanodeIDList.size() == 0) {
+ LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
+ pool.getPoolName());
+ this.status = ProgressStatus.Error;
+ return;
+ }
+
+ nodeProcessed = new AtomicInteger(0);
+ containerProcessedCount = new AtomicInteger(0);
+ nodeCount = new AtomicInteger(0);
+ /*
+ Ask each datanode to send us commands.
+ */
+ SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
+ for (DatanodeID id : datanodeIDList) {
+ NodeManager.NODESTATE currentState = getNodestate(id);
+ if (currentState == HEALTHY || currentState == STALE) {
+ nodeCount.incrementAndGet();
+ // Queue commands to all datanodes in this pool to send us container
+ // report. Since we ignore dead nodes, it is possible that we would have
+ // over replicated the container if the node comes back.
+ commandQueue.addCommand(id, cmd);
+ }
+ }
+ this.status = ProgressStatus.InProgress;
+ this.getPool().setLastProcessedTime(Time.monotonicNow());
+ }
+
+ /**
+ * Gets the node state.
+ *
+ * @param id - datanode ID.
+ * @return NodeState.
+ */
+ private NodeManager.NODESTATE getNodestate(DatanodeID id) {
+ NodeManager.NODESTATE currentState = UNKNOWN;
+ int maxTry = 100;
+ // We need to loop to make sure that we will retry if we get
+ // node state unknown. This can lead to infinite loop if we send
+ // in unknown node ID. So max try count is used to prevent it.
+
+ int currentTry = 0;
+ while (currentState == UNKNOWN && currentTry < maxTry) {
+ // Retry to make sure that we deal with the case of node state not
+ // known.
+ currentState = nodeManager.getNodeState(id);
+ currentTry++;
+ if (currentState == UNKNOWN) {
+ // Sleep to make sure that this is not a tight loop.
+ sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+ if (currentState == UNKNOWN) {
+ LOG.error("Not able to determine the state of Node: {}, Exceeded max " +
+ "try and node manager returns UNKNOWN state. This indicates we " +
+ "are dealing with a node that we don't know about.", id);
+ }
+ return currentState;
+ }
+
+ /**
+ * Queues a container Report for handling. This is done in a worker thread
+ * since decoding a container report might be compute intensive . We don't
+ * want to block since we have asked for bunch of container reports
+ * from a set of datanodes.
+ *
+ * @param containerReport - ContainerReport
+ */
+ public void handleContainerReport(ContainerReportsProto containerReport) {
+ executorService.submit(processContainerReport(containerReport));
+ }
+
+ private Runnable processContainerReport(ContainerReportsProto reports) {
+ return () -> {
+ DatanodeID datanodeID =
+ DatanodeID.getFromProtoBuf(reports.getDatanodeID());
+ if (processedNodeSet.computeIfAbsent(datanodeID.getDatanodeUuid(),
+ (k) -> true)) {
+ nodeProcessed.incrementAndGet();
+ LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
+ datanodeID.getDatanodeUuid());
+ for (ContainerInfo info : reports.getReportsList()) {
+ containerProcessedCount.incrementAndGet();
+ LOG.debug("Total Containers processed: {} Container Name: {}",
+ containerProcessedCount.get(), info.getContainerName());
+
+ // Update the container map with count + 1 if the key exists or
+ // update the map with 1. Since this is a concurrentMap the
+ // computation and update is atomic.
+ containerCountMap.merge(info.getContainerName(), 1, Integer::sum);
+ }
+ }
+ };
+ }
+
+ /**
+ * Filter the containers based on specific rules.
+ *
+ * @param predicate -- Predicate to filter by
+ * @return A list of map entries.
+ */
+ public List> filterContainer(
+ Predicate> predicate) {
+ return containerCountMap.entrySet().stream()
+ .filter(predicate).collect(Collectors.toList());
+ }
+
+ /**
+ * Used only for testing, calling this will abort container report
+ * processing. This is very dangerous call and should not be made by any users
+ */
+ @VisibleForTesting
+ public void setDoneProcessing() {
+ nodeProcessed.set(nodeCount.get());
+ }
+
+ /**
+ * Returns the pool name.
+ *
+ * @return Name of the pool.
+ */
+ String getPoolName() {
+ return pool.getPoolName();
+ }
+
+ /**
+ * Current status of the computing replication status.
+ */
+ public enum ProgressStatus {
+ InProgress, Done, Error
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java
new file mode 100644
index 0000000000..35b1e7607b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Periodic pool is a pool with a time stamp, this allows us to process pools
+ * based on a cyclic clock.
+ */
+public class PeriodicPool implements Comparable {
+ private final String poolName;
+ private long lastProcessedTime;
+ private AtomicLong totalProcessedCount;
+
+ /**
+ * Constructs a periodic pool.
+ *
+ * @param poolName - Name of the pool
+ */
+ public PeriodicPool(String poolName) {
+ this.poolName = poolName;
+ lastProcessedTime = 0;
+ totalProcessedCount = new AtomicLong(0);
+ }
+
+ /**
+ * Get pool Name.
+ * @return PoolName
+ */
+ public String getPoolName() {
+ return poolName;
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified object.
+ *
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(PeriodicPool o) {
+ return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PeriodicPool that = (PeriodicPool) o;
+
+ return poolName.equals(that.poolName);
+ }
+
+ @Override
+ public int hashCode() {
+ return poolName.hashCode();
+ }
+
+ /**
+ * Returns the Total Times we have processed this pool.
+ *
+ * @return processed count.
+ */
+ public long getTotalProcessedCount() {
+ return totalProcessedCount.get();
+ }
+
+ /**
+ * Gets the last time we processed this pool.
+ * @return time in milliseconds
+ */
+ public long getLastProcessedTime() {
+ return this.lastProcessedTime;
+ }
+
+
+ /**
+ * Sets the last processed time.
+ *
+ * @param lastProcessedTime - Long in milliseconds.
+ */
+
+ public void setLastProcessedTime(long lastProcessedTime) {
+ this.lastProcessedTime = lastProcessedTime;
+ }
+
+ /*
+ * Increments the total processed count.
+ */
+ public void incTotalProcessedCount() {
+ this.totalProcessedCount.incrementAndGet();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java
new file mode 100644
index 0000000000..82e420272e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+/*
+ This package contains routines that manage replication of a container. This
+ relies on container reports to understand the replication level of a
+ container - UnderReplicated, Replicated, OverReplicated -- and manages the
+ replication level based on that.
+ */
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
index 59326064ab..bbf319b674 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
import java.util.HashMap;
import java.util.LinkedList;
@@ -35,18 +38,43 @@
* there where queued.
*/
public class CommandQueue {
-
- private final Map> commandMap;
- private final Lock lock;
- // This map is used as default return value.
+ // This list is used as default return value.
private static final List DEFAULT_LIST = new LinkedList<>();
+ private final Map commandMap;
+ private final Lock lock;
+ private long commandsInQueue;
+
+ /**
+ * Returns number of commands in queue.
+ * @return Command Count.
+ */
+ public long getCommandsInQueue() {
+ return commandsInQueue;
+ }
/**
* Constructs a Command Queue.
+ * TODO : Add a flusher thread that throws away commands older than a certain
+ * time period.
*/
public CommandQueue() {
commandMap = new HashMap<>();
lock = new ReentrantLock();
+ commandsInQueue = 0;
+ }
+
+ /**
+ * This function is used only for test purposes.
+ */
+ @VisibleForTesting
+ public void clear() {
+ lock.lock();
+ try {
+ commandMap.clear();
+ commandsInQueue = 0;
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -61,8 +89,15 @@ public CommandQueue() {
List getCommand(final DatanodeID datanodeID) {
lock.lock();
try {
- List cmds = commandMap.remove(datanodeID);
- return cmds == null ? DEFAULT_LIST : cmds;
+ Commands cmds = commandMap.remove(datanodeID);
+ List cmdList = null;
+ if(cmds != null) {
+ cmdList = cmds.getCommands();
+ commandsInQueue -= cmdList.size() > 0 ? cmdList.size() : 0;
+ // A post condition really.
+ Preconditions.checkState(commandsInQueue >= 0);
+ }
+ return cmds == null ? DEFAULT_LIST : cmdList;
} finally {
lock.unlock();
}
@@ -74,19 +109,82 @@ List getCommand(final DatanodeID datanodeID) {
* @param datanodeID DatanodeID
* @param command - Command
*/
- void addCommand(final DatanodeID datanodeID, final SCMCommand command) {
+ public void addCommand(final DatanodeID datanodeID, final SCMCommand
+ command) {
lock.lock();
try {
if (commandMap.containsKey(datanodeID)) {
commandMap.get(datanodeID).add(command);
} else {
- LinkedList newList = new LinkedList<>();
- newList.add(command);
- commandMap.put(datanodeID, newList);
+ commandMap.put(datanodeID, new Commands(command));
}
+ commandsInQueue++;
} finally {
lock.unlock();
}
}
+ /**
+ * Class that stores commands for a datanode.
+ */
+ private static class Commands {
+ private long updateTime;
+ private long readTime;
+ private List commands;
+
+ /**
+ * Constructs a Commands class.
+ */
+ Commands() {
+ commands = new LinkedList<>();
+ updateTime = 0;
+ readTime = 0;
+ }
+
+ /**
+ * Creates the object and populates with the command.
+ * @param command command to add to queue.
+ */
+ Commands(SCMCommand command) {
+ this();
+ this.add(command);
+ }
+
+ /**
+ * Gets the last time the commands for this node was updated.
+ * @return Time stamp
+ */
+ public long getUpdateTime() {
+ return updateTime;
+ }
+
+ /**
+ * Gets the last read time.
+ * @return last time when these commands were read from this queue.
+ */
+ public long getReadTime() {
+ return readTime;
+ }
+
+ /**
+ * Adds a command to the list.
+ *
+ * @param command SCMCommand
+ */
+ public void add(SCMCommand command) {
+ this.commands.add(command);
+ updateTime = Time.monotonicNow();
+ }
+
+ /**
+ * Returns the commands for this datanode.
+ * @return command list.
+ */
+ public List getCommands() {
+ List temp = this.commands;
+ this.commands = new LinkedList<>();
+ readTime = Time.monotonicNow();
+ return temp;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index d4ca85f53c..3670f56e54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -109,7 +109,8 @@ public interface NodeManager extends StorageContainerNodeProtocol,
enum NODESTATE {
HEALTHY,
STALE,
- DEAD
+ DEAD,
+ UNKNOWN
}
/**
@@ -137,4 +138,11 @@ enum NODESTATE {
*/
@VisibleForTesting
boolean waitForHeartbeatProcessed();
+
+ /**
+ * Returns the node state of a specific node.
+ * @param id - DatanodeID
+ * @return Healthy/Stale/Dead.
+ */
+ NODESTATE getNodeState(DatanodeID id);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index 3ab72750ec..a71fbcca06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -376,6 +376,12 @@ public int getNodeCount(NODESTATE nodestate) {
return staleNodeCount.get();
case DEAD:
return deadNodeCount.get();
+ case UNKNOWN:
+ // This is unknown due to the fact that some nodes can be in
+ // transit between the other states. Returning a count for that is not
+ // possible. The fact that we have such state is to deal with the fact
+ // that this information might not be consistent always.
+ return 0;
default:
throw new IllegalArgumentException("Unknown node state requested.");
}
@@ -392,6 +398,37 @@ public boolean waitForHeartbeatProcessed() {
return lastHBcheckFinished != 0;
}
+ /**
+ * Returns the node state of a specific node.
+ *
+ * @param id - DatanodeID
+ * @return Healthy/Stale/Dead/Unknown.
+ */
+ @Override
+ public NODESTATE getNodeState(DatanodeID id) {
+ // There is a subtle race condition here, hence we also support
+ // the NODEState.UNKNOWN. It is possible that just before we check the
+ // healthyNodes, we have removed the node from the healthy list but stil
+ // not added it to Stale Nodes list.
+ // We can fix that by adding the node to stale list before we remove, but
+ // then the node is in 2 states to avoid this race condition. Instead we
+ // just deal with the possibilty of getting a state called unknown.
+
+ if(healthyNodes.containsKey(id.getDatanodeUuid())) {
+ return NODESTATE.HEALTHY;
+ }
+
+ if(staleNodes.containsKey(id.getDatanodeUuid())) {
+ return NODESTATE.STALE;
+ }
+
+ if(deadNodes.containsKey(id.getDatanodeUuid())) {
+ return NODESTATE.DEAD;
+ }
+
+ return NODESTATE.UNKNOWN;
+ }
+
/**
* This is the real worker thread that processes the HB queue. We do the
* following things in this thread.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 212d9b737b..aa52979870 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -110,8 +110,9 @@ message ContainerReportsProto {
fullReport = 0;
deltaReport = 1;
}
- repeated ContainerInfo reports = 1;
- required reportType type = 2;
+ required DatanodeIDProto datanodeID = 1;
+ repeated ContainerInfo reports = 2;
+ required reportType type = 3;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
new file mode 100644
index 0000000000..d0f440f0a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This class manages the state of datanode
+ * in conjunction with the node pool and node managers.
+ */
+public class ReplicationDatanodeStateManager {
+ private final NodeManager nodeManager;
+ private final NodePoolManager poolManager;
+ private final Random r;
+
+ /**
+ * The datanode state Manager.
+ *
+ * @param nodeManager
+ * @param poolManager
+ */
+ public ReplicationDatanodeStateManager(NodeManager nodeManager,
+ NodePoolManager poolManager) {
+ this.nodeManager = nodeManager;
+ this.poolManager = poolManager;
+ r = new Random();
+ }
+
+ /**
+ * Get Container Report as if it is from a datanode in the cluster.
+ * @param containerName - Container Name.
+ * @param poolName - Pool Name.
+ * @param dataNodeCount - Datanode Count.
+ * @return List of Container Reports.
+ */
+ public List getContainerReport(String containerName,
+ String poolName, int dataNodeCount) {
+ List containerList = new LinkedList<>();
+ List nodesInPool = poolManager.getNodes(poolName);
+
+ if (nodesInPool == null) {
+ return containerList;
+ }
+
+ if (nodesInPool.size() < dataNodeCount) {
+ throw new IllegalStateException("Not enough datanodes to create " +
+ "required container reports");
+ }
+
+ while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
+ DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
+ nodesInPool.remove(id);
+ // We return container reports only for nodes that are healthy.
+ if (nodeManager.getNodeState(id) == NodeManager.NODESTATE.HEALTHY) {
+ ContainerInfo info = ContainerInfo.newBuilder()
+ .setContainerName(containerName)
+ .setFinalhash(DigestUtils.sha256Hex(containerName))
+ .build();
+ ContainerReportsProto containerReport = ContainerReportsProto
+ .newBuilder().addReports(info)
+ .setDatanodeID(id.getProtoBufMessage())
+ .setType(ContainerReportsProto.reportType.fullReport)
+ .build();
+ containerList.add(containerReport);
+ }
+ }
+ return containerList;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
new file mode 100644
index 0000000000..e432c26908
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Node Manager to test replication.
+ */
+public class ReplicationNodeManagerMock implements NodeManager {
+ private final Map nodeStateMap;
+
+ /**
+ * A list of Datanodes and current states.
+ * @param nodeState A node state map.
+ */
+ public ReplicationNodeManagerMock(Map nodeState) {
+ Preconditions.checkNotNull(nodeState);
+ nodeStateMap = nodeState;
+ }
+
+ /**
+ * Get the minimum number of nodes to get out of chill mode.
+ *
+ * @return int
+ */
+ @Override
+ public int getMinimumChillModeNodes() {
+ return 0;
+ }
+
+ /**
+ * Reports if we have exited out of chill mode by discovering enough nodes.
+ *
+ * @return True if we are out of Node layer chill mode, false otherwise.
+ */
+ @Override
+ public boolean isOutOfNodeChillMode() {
+ return !nodeStateMap.isEmpty();
+ }
+
+ /**
+ * Returns a chill mode status string.
+ *
+ * @return String
+ */
+ @Override
+ public String getChillModeStatus() {
+ return null;
+ }
+
+ /**
+ * Returns the status of manual chill mode flag.
+ *
+ * @return true if forceEnterChillMode has been called, false if
+ * forceExitChillMode or status is not set. eg. clearChillModeFlag.
+ */
+ @Override
+ public boolean isInManualChillMode() {
+ return false;
+ }
+
+ /**
+ * Get the number of data nodes that in all states.
+ *
+ * @return A state to number of nodes that in this state mapping
+ */
+ @Override
+ public Map getNodeCount() {
+ return null;
+ }
+
+ /**
+ * Removes a data node from the management of this Node Manager.
+ *
+ * @param node - DataNode.
+ * @throws UnregisteredNodeException
+ */
+ @Override
+ public void removeNode(DatanodeID node) throws UnregisteredNodeException {
+ nodeStateMap.remove(node);
+
+ }
+
+ /**
+ * Gets all Live Datanodes that is currently communicating with SCM.
+ *
+ * @param nodestate - State of the node
+ * @return List of Datanodes that are Heartbeating SCM.
+ */
+ @Override
+ public List getNodes(NODESTATE nodestate) {
+ return null;
+ }
+
+ /**
+ * Returns the Number of Datanodes that are communicating with SCM.
+ *
+ * @param nodestate - State of the node
+ * @return int -- count
+ */
+ @Override
+ public int getNodeCount(NODESTATE nodestate) {
+ return 0;
+ }
+
+ /**
+ * Get all datanodes known to SCM.
+ *
+ * @return List of DatanodeIDs known to SCM.
+ */
+ @Override
+ public List getAllNodes() {
+ return null;
+ }
+
+ /**
+ * Chill mode is the period when node manager waits for a minimum
+ * configured number of datanodes to report in. This is called chill mode
+ * to indicate the period before node manager gets into action.
+ *
+ * Forcefully exits the chill mode, even if we have not met the minimum
+ * criteria of the nodes reporting in.
+ */
+ @Override
+ public void forceExitChillMode() {
+
+ }
+
+ /**
+ * Forcefully enters chill mode, even if all minimum node conditions are met.
+ */
+ @Override
+ public void forceEnterChillMode() {
+
+ }
+
+ /**
+ * Clears the manual chill mode flag.
+ */
+ @Override
+ public void clearChillModeFlag() {
+
+ }
+
+ /**
+ * Returns the aggregated node stats.
+ *
+ * @return the aggregated node stats.
+ */
+ @Override
+ public SCMNodeStat getStats() {
+ return null;
+ }
+
+ /**
+ * Return a map of node stats.
+ *
+ * @return a map of individual node stats (live/stale but not dead).
+ */
+ @Override
+ public Map getNodeStats() {
+ return null;
+ }
+
+ /**
+ * Return the node stat of the specified datanode.
+ *
+ * @param datanodeID - datanode ID.
+ * @return node stat if it is live/stale, null if it is dead or does't exist.
+ */
+ @Override
+ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
+ return null;
+ }
+
+ /**
+ * Wait for the heartbeat is processed by NodeManager.
+ *
+ * @return true if heartbeat has been processed.
+ */
+ @Override
+ public boolean waitForHeartbeatProcessed() {
+ return false;
+ }
+
+ /**
+ * Returns the node state of a specific node.
+ *
+ * @param id - DatanodeID
+ * @return Healthy/Stale/Dead.
+ */
+ @Override
+ public NODESTATE getNodeState(DatanodeID id) {
+ return nodeStateMap.get(id);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ *
As noted in {@link AutoCloseable#close()}, cases where the
+ * close may fail require careful attention. It is strongly advised
+ * to relinquish the underlying resources and to internally
+ * mark the {@code Closeable} as closed, prior to throwing
+ * the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ /**
+ * When an object implementing interface Runnable
is used
+ * to create a thread, starting the thread causes the object's
+ * run
method to be called in that separately executing
+ * thread.
+ *
+ * The general contract of the method run
is that it may
+ * take any action whatsoever.
+ *
+ * @see Thread#run()
+ */
+ @Override
+ public void run() {
+
+ }
+
+ /**
+ * Gets the version info from SCM.
+ *
+ * @param versionRequest - version Request.
+ * @return - returns SCM version info and other required information needed by
+ * datanode.
+ */
+ @Override
+ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
+ return null;
+ }
+
+ /**
+ * Register the node if the node finds that it is not registered with any SCM.
+ *
+ * @param datanodeID - Send datanodeID with Node info, but datanode UUID is
+ * empty. Server returns a datanodeID for the given node.
+ * @return SCMHeartbeatResponseProto
+ */
+ @Override
+ public SCMCommand register(DatanodeID datanodeID) {
+ return null;
+ }
+
+ /**
+ * Send heartbeat to indicate the datanode is alive and doing well.
+ *
+ * @param datanodeID - Datanode ID.
+ * @param nodeReport - node report.
+ * @return SCMheartbeat response list
+ */
+ @Override
+ public List sendHeartbeat(DatanodeID datanodeID,
+ SCMNodeReport nodeReport) {
+ return null;
+ }
+
+ /**
+ * Clears all nodes from the node Manager.
+ */
+ public void clearMap() {
+ this.nodeStateMap.clear();
+ }
+
+ /**
+ * Adds a node to the existing Node manager. This is used only for test
+ * purposes.
+ * @param id - DatanodeID
+ * @param state State you want to put that node to.
+ */
+ public void addNode(DatanodeID id, NODESTATE state) {
+ nodeStateMap.put(id, state);
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java
new file mode 100644
index 0000000000..a86a54c15a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Pool Manager replication mock.
+ */
+public class ReplicationNodePoolManagerMock implements NodePoolManager {
+
+ private final Map nodeMemberShip;
+
+ /**
+ * A node pool manager for testing.
+ */
+ public ReplicationNodePoolManagerMock() {
+ nodeMemberShip = new HashMap<>();
+ }
+
+ /**
+ * Add a node to a node pool.
+ *
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ */
+ @Override
+ public void addNode(String pool, DatanodeID node) {
+ nodeMemberShip.put(node, pool);
+ }
+
+ /**
+ * Remove a node from a node pool.
+ *
+ * @param pool - name of the node pool.
+ * @param node - data node.
+ * @throws SCMException
+ */
+ @Override
+ public void removeNode(String pool, DatanodeID node) throws SCMException {
+ nodeMemberShip.remove(node);
+
+ }
+
+ /**
+ * Get a list of known node pools.
+ *
+ * @return a list of known node pool names or an empty list if not node pool
+ * is defined.
+ */
+ @Override
+ public List getNodePools() {
+ Set poolSet = new HashSet<>();
+ for (Map.Entry entry : nodeMemberShip.entrySet()) {
+ poolSet.add(entry.getValue());
+ }
+ return new ArrayList<>(poolSet);
+
+ }
+
+ /**
+ * Get all nodes of a node pool given the name of the node pool.
+ *
+ * @param pool - name of the node pool.
+ * @return a list of datanode ids or an empty list if the node pool was not
+ * found.
+ */
+ @Override
+ public List getNodes(String pool) {
+ Set datanodeSet = new HashSet<>();
+ for (Map.Entry entry : nodeMemberShip.entrySet()) {
+ if (entry.getValue().equals(pool)) {
+ datanodeSet.add(entry.getKey());
+ }
+ }
+ return new ArrayList<>(datanodeSet);
+ }
+
+ /**
+ * Get the node pool name if the node has been added to a node pool.
+ *
+ * @param datanodeID - datanode ID.
+ * @return node pool name if it has been assigned. null if the node has not
+ * been assigned to any node pool yet.
+ */
+ @Override
+ public String getNodePool(DatanodeID datanodeID) {
+ return nodeMemberShip.get(datanodeID);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ *
As noted in {@link AutoCloseable#close()}, cases where the
+ * close may fail require careful attention. It is strongly advised
+ * to relinquish the underlying resources and to internally
+ * mark the {@code Closeable} as closed, prior to throwing
+ * the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java
new file mode 100644
index 0000000000..ff597d58fa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+// Helper classes for ozone and container tests.
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
index 252ff670a1..2789275118 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -138,8 +139,7 @@ public static InetSocketAddress getReuseableAddress() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
int port = socket.getLocalPort();
- String addr = InetAddress.getLoopbackAddress().getHostAddress()
- .toString();
+ String addr = InetAddress.getLoopbackAddress().getHostAddress();
return new InetSocketAddress(addr, port);
}
}
@@ -148,6 +148,10 @@ public static Configuration getConf() {
return new Configuration();
}
+ public static OzoneConfiguration getOzoneConf() {
+ return new OzoneConfiguration();
+ }
+
public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
return getDatanodeID(nodeManager, UUID.randomUUID().toString());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 3c3f3b7985..63ada33d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -61,6 +61,8 @@
import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.container.common.SCMTestUtils
+ .getDatanodeID;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
@@ -193,7 +195,7 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {
public void testRegister() throws Exception {
String[] scmAddressArray = new String[1];
scmAddressArray[0] = serverAddress.toString();
- DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID();
+ DatanodeID nodeToRegister = getDatanodeID();
try (EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(
SCMTestUtils.getConf(), serverAddress, 1000)) {
@@ -218,7 +220,7 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
if (!clearContainerID) {
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
.setClusterID(UUID.randomUUID().toString())
- .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+ .setDatanodeID(getDatanodeID().getProtoBufMessage())
.build();
endpointTask.setContainerNodeIDProto(containerNodeID);
}
@@ -272,7 +274,7 @@ public void testRegisterRpcTimeout() throws Exception {
@Test
public void testHeartbeat() throws Exception {
- DatanodeID dataNode = SCMTestUtils.getDatanodeID();
+ DatanodeID dataNode = getDatanodeID();
try (EndpointStateMachine rpcEndPoint =
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
@@ -299,7 +301,7 @@ private void heartbeatTaskHelper(InetSocketAddress scmAddress,
scmAddress, rpcTimeout)) {
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
.setClusterID(UUID.randomUUID().toString())
- .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+ .setDatanodeID(getDatanodeID().getProtoBufMessage())
.build();
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
@@ -365,6 +367,8 @@ ContainerReport getRandomContainerReport() {
reportsBuilder.addReports(getRandomContainerReport()
.getProtoBufMessage());
}
+ reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
+ .getProtoBufMessage());
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
.ContainerReportsProto.reportType.fullReport);
return reportsBuilder.build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 35e358a9f6..3b53e8f818 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -19,6 +19,7 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -354,7 +355,7 @@ private ChunkInfo writeChunkHelper(String containerName, String keyName,
pipeline.setContainerName(containerName);
ContainerData cData = new ContainerData(containerName);
cData.addMetadata("VOLUME", "shire");
- cData.addMetadata("owner)", "bilbo");
+ cData.addMetadata("owner", "bilbo");
if(!containerManager.getContainerMap()
.containsKey(containerName)) {
containerManager.createContainer(pipeline, cData);
@@ -773,7 +774,7 @@ private KeyData writeKeyHelper(Pipeline pipeline,
@Test
public void testListKey() throws Exception {
- String containerName = "c-0";
+ String containerName = "c0" + RandomStringUtils.randomAscii(10);
Pipeline pipeline = createSingleNodePipeline(containerName);
List expectedKeys = new ArrayList();
for (int i = 0; i < 10; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
new file mode 100644
index 0000000000..dc42a34cf7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.TestUtils
+ .ReplicationDatanodeStateManager;
+import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock;
+import org.apache.hadoop.ozone.container.TestUtils
+ .ReplicationNodePoolManagerMock;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.container.replication
+ .ContainerReplicationManager;
+import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.scm.ScmConfigKeys
+ .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
+import static org.apache.ratis.shaded.com.google.common.util.concurrent
+ .Uninterruptibles.sleepUninterruptibly;
+
+/**
+ * Tests for the container manager.
+ */
+public class TestContainerReplicationManager {
+ final static String POOL_NAME_TEMPLATE = "Pool%d";
+ static final int MAX_DATANODES = 72;
+ static final int POOL_SIZE = 24;
+ static final int POOL_COUNT = 3;
+ private LogCapturer logCapturer = LogCapturer.captureLogs(
+ LogFactory.getLog(ContainerReplicationManager.class));
+ private List datanodes = new LinkedList<>();
+ private NodeManager nodeManager;
+ private NodePoolManager poolManager;
+ private CommandQueue commandQueue;
+ private ContainerReplicationManager replicationManager;
+ private ReplicationDatanodeStateManager datanodeStateManager;
+
+ @After
+ public void tearDown() throws Exception {
+ logCapturer.stopCapturing();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Map nodeStateMap = new HashMap<>();
+ // We are setting up 3 pools with 24 nodes each in this cluster.
+ // First we create 72 Datanodes.
+ for (int x = 0; x < MAX_DATANODES; x++) {
+ DatanodeID datanode = SCMTestUtils.getDatanodeID();
+ datanodes.add(datanode);
+ nodeStateMap.put(datanode, NodeManager.NODESTATE.HEALTHY);
+ }
+
+ // All nodes in this cluster are healthy for time being.
+ nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
+ poolManager = new ReplicationNodePoolManagerMock();
+ commandQueue = new CommandQueue();
+
+ Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
+ "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
+
+ // Start from 1 instead of zero so we can multiply and get the node index.
+ for (int y = 1; y <= POOL_COUNT; y++) {
+ String poolName = String.format(POOL_NAME_TEMPLATE, y);
+ for (int z = 0; z < POOL_SIZE; z++) {
+ DatanodeID id = datanodes.get(y * z);
+ poolManager.addNode(poolName, id);
+ }
+ }
+ OzoneConfiguration config = SCMTestUtils.getOzoneConf();
+ config.setInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS, 1);
+ replicationManager = new ContainerReplicationManager(config,
+ nodeManager, poolManager, commandQueue);
+ datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
+ poolManager);
+ // Sleep for one second to make sure all threads get time to run.
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ /**
+ * Asserts that at least one pool is picked up for processing.
+ */
+ public void testAssertPoolsAreProcessed() {
+ // This asserts that replication manager has started processing at least
+ // one pool.
+ Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
+
+ // Since all datanodes are flagged as healthy in this test, for each
+ // datanode we must have queued a command.
+ Assert.assertEquals("Commands are in queue :", commandQueue
+ .getCommandsInQueue(), POOL_SIZE * replicationManager
+ .getInProgressPoolCount());
+ }
+
+ @Test
+ /**
+ * This test sends container reports for 2 containers to a pool in progress.
+ * Asserts that we are able to find a container with single replica and do
+ * not find container with 3 replicas.
+ */
+ public void testDetectSingleContainerReplica() throws TimeoutException,
+ InterruptedException {
+ String singleNodeContainer = "SingleNodeContainer";
+ String threeNodeContainer = "ThreeNodeContainer";
+ InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+ // Only single datanode reporting that "SingleNodeContainer" exists.
+ List clist =
+ datanodeStateManager.getContainerReport(singleNodeContainer,
+ ppool.getPool().getPoolName(), 1);
+ ppool.handleContainerReport(clist.get(0));
+
+ // Three nodes are going to report that ThreeNodeContainer exists.
+ clist = datanodeStateManager.getContainerReport(threeNodeContainer,
+ ppool.getPool().getPoolName(), 3);
+
+ for (ContainerReportsProto reportsProto : clist) {
+ ppool.handleContainerReport(reportsProto);
+ }
+ GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
+ 200, 1000);
+ ppool.setDoneProcessing();
+
+ List> containers = ppool.filterContainer(p -> p
+ .getValue() == 1);
+ Assert.assertEquals(singleNodeContainer, containers.get(0).getKey());
+ int count = containers.get(0).getValue();
+ Assert.assertEquals(count, 1L);
+ }
+
+ @Test
+ /**
+ * We create three containers, Normal,OveReplicated and WayOverReplicated
+ * containers. This test asserts that we are able to find the
+ * over replicated containers.
+ */
+ public void testDetectOverReplica() throws TimeoutException,
+ InterruptedException {
+ String normalContainer = "NormalContainer";
+ String overReplicated = "OverReplicatedContainer";
+ String wayOverReplicated = "WayOverReplicated";
+ InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+
+ List clist =
+ datanodeStateManager.getContainerReport(normalContainer,
+ ppool.getPool().getPoolName(), 3);
+ ppool.handleContainerReport(clist.get(0));
+
+ clist = datanodeStateManager.getContainerReport(overReplicated,
+ ppool.getPool().getPoolName(), 4);
+
+ for (ContainerReportsProto reportsProto : clist) {
+ ppool.handleContainerReport(reportsProto);
+ }
+
+ clist = datanodeStateManager.getContainerReport(wayOverReplicated,
+ ppool.getPool().getPoolName(), 7);
+
+ for (ContainerReportsProto reportsProto : clist) {
+ ppool.handleContainerReport(reportsProto);
+ }
+
+ // We ignore container reports from the same datanodes.
+ // it is possible that these each of these containers get placed
+ // on same datanodes, so allowing for 4 duplicates in the set of 14.
+ GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
+ 200, 1000);
+ ppool.setDoneProcessing();
+
+ List> containers = ppool.filterContainer(p -> p
+ .getValue() > 3);
+ Assert.assertEquals(2, containers.size());
+ }
+
+ @Test
+ /**
+ * This test verifies that all pools are picked up for replica processing.
+ *
+ */
+ public void testAllPoolsAreProcessed() throws TimeoutException,
+ InterruptedException {
+ // Verify that we saw all three pools being picked up for processing.
+ GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
+ >= 3, 200, 15 * 1000);
+ Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
+ logCapturer.getOutput().contains("Pool2") &&
+ logCapturer.getOutput().contains("Pool3"));
+ }
+
+ @Test
+ /**
+ * Adds a new pool and tests that we are able to pick up that new pool for
+ * processing as well as handle container reports for datanodes in that pool.
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void testAddingNewPoolWorks() throws TimeoutException,
+ InterruptedException {
+ LogCapturer inProgressLog = LogCapturer.captureLogs(
+ LogFactory.getLog(InProgressPool.class));
+ GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.ALL);
+ try {
+ DatanodeID id = SCMTestUtils.getDatanodeID();
+ ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, NodeManager
+ .NODESTATE.HEALTHY);
+ poolManager.addNode("PoolNew", id);
+ GenericTestUtils.waitFor(() ->
+ logCapturer.getOutput().contains("PoolNew"),
+ 200, 15 * 1000);
+
+ // Assert that we are able to send a container report to this new
+ // pool and datanode.
+ List clist =
+ datanodeStateManager.getContainerReport("NewContainer1",
+ "PoolNew", 1);
+ replicationManager.handleContainerReport(clist.get(0));
+ GenericTestUtils.waitFor(() ->
+ inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
+ .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);
+ } finally {
+ inProgressLog.stopCapturing();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000000..318c54d958
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+// Test classes for replication.
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index 16f2e3cbe5..c0bebf29d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -39,6 +39,7 @@
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -53,12 +54,14 @@ public class TestSCMMXBean {
private static MBeanServer mbs;
@BeforeClass
- public static void init() throws IOException {
+ public static void init() throws IOException, TimeoutException,
+ InterruptedException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
+ cluster.waitOzoneReady();
scm = cluster.getStorageContainerManager();
mbs = ManagementFactory.getPlatformMBeanServer();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
index 638de9ef58..e999ca2c45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
@@ -251,6 +251,17 @@ public boolean waitForHeartbeatProcessed() {
return false;
}
+ /**
+ * Returns the node state of a specific node.
+ *
+ * @param id - DatanodeID
+ * @return Healthy/Stale/Dead.
+ */
+ @Override
+ public NODESTATE getNodeState(DatanodeID id) {
+ return null;
+ }
+
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.