diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 85407e65ce..df6fbf0c75 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -243,32 +243,6 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; - /** - * Don't start processing a pool if we have not had a minimum number of - * seconds from the last processing. - */ - public static final String OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL = - "ozone.scm.container.report.processing.interval"; - public static final String - OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s"; - - /** - * This determines the total number of pools to be processed in parallel. - */ - public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS = - "ozone.scm.max.nodepool.processing.threads"; - public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1; - /** - * These 2 settings control the number of threads in executor pool and time - * outs for thw container reports from all nodes. - */ - public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS = - "ozone.scm.max.container.report.threads"; - public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100; - public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT = - "ozone.scm.container.reports.wait.timeout"; - public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT = - "5m"; public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = "ozone.scm.block.deletion.max.retry"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index c40dc8e4ee..08a5ffdb87 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -91,7 +91,6 @@ public final class OzoneConsts { public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX; public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX; public static final String BLOCK_DB = "block.db"; - public static final String NODEPOOL_DB = "nodepool.db"; public static final String OPEN_CONTAINERS_DB = "openContainers.db"; public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String KSM_DB_NAME = "ksm.db"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 7a91610c65..25365c8d9d 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -571,25 +571,6 @@ allocation. - - ozone.scm.container.report.processing.interval - 60s - OZONE, PERFORMANCE - Time interval for scm to process container reports - for a node pool. Scm handles node pool reports in a cyclic clock - manner, it fetches pools periodically with this time interval. - - - - ozone.scm.container.reports.wait.timeout - 300s - OZONE, PERFORMANCE, MANAGEMENT - Maximum time to wait in seconds for processing all container - reports from - a node pool. It determines the timeout for a - node pool report. - - ozone.scm.container.size.gb 5 @@ -792,17 +773,6 @@ The keytab file for Kerberos authentication in SCM. - - ozone.scm.max.container.report.threads - 100 - OZONE, PERFORMANCE - - Maximum number of threads to process container reports in scm. - Each container report from a data node is processed by scm in a worker - thread, fetched from a thread pool. This property is used to control the - maximum size of the thread pool. - - ozone.scm.max.hb.count.to.process 5000 @@ -814,14 +784,6 @@ for more info. - - ozone.scm.max.nodepool.processing.threads - 1 - OZONE, MANAGEMENT, PERFORMANCE - - Number of node pools to process in parallel. - - ozone.scm.names @@ -843,15 +805,6 @@ see ozone.scm.heartbeat.thread.interval before changing this value. - - ozone.scm.max.nodepool.processing.threads - 1 - OZONE, SCM - - Controls the number of node pools that can be processed in parallel by - Container Supervisor. - - ozone.trace.enabled false diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java new file mode 100644 index 0000000000..b83ecf13bc --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; + +/** + * Priority queue to handle under-replicated and over replicated containers + * in ozone. ReplicationManager will consume these messages and decide + * accordingly. + */ +public class ReplicationQueue { + + private final Queue queue; + + ReplicationQueue() { + queue = new PriorityQueue<>(); + } + + public synchronized boolean add(ReplicationReqMsg repObj) { + if (this.queue.contains(repObj)) { + // Remove the earlier message and insert this one + this.queue.remove(repObj); + return this.queue.add(repObj); + } else { + return this.queue.add(repObj); + } + } + + public synchronized boolean remove(ReplicationReqMsg repObj) { + return queue.remove(repObj); + } + + /** + * Retrieves, but does not remove, the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public synchronized ReplicationReqMsg peek() { + return queue.peek(); + } + + /** + * Retrieves and removes the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public synchronized ReplicationReqMsg poll() { + return queue.poll(); + } + + public synchronized boolean removeAll(List repObjs) { + return queue.removeAll(repObjs); + } + + public int size() { + return queue.size(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java new file mode 100644 index 0000000000..8d26fc368d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.Serializable; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.math.NumberUtils; + +/** + * Wrapper class for hdds replication queue. Implements its natural + * ordering for priority queue. + */ +public class ReplicationReqMsg implements Comparable, + Serializable { + private final long containerId; + private final short replicationCount; + private final short expecReplicationCount; + private final long timestamp; + + public ReplicationReqMsg(long containerId, short replicationCount, + long timestamp, short expecReplicationCount) { + this.containerId = containerId; + this.replicationCount = replicationCount; + this.timestamp = timestamp; + this.expecReplicationCount = expecReplicationCount; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(ReplicationReqMsg o) { + if (this == o) { + return 0; + } + if (o == null) { + return 1; + } + int retVal = NumberUtils + .compare(getReplicationCount() - getExpecReplicationCount(), + o.getReplicationCount() - o.getExpecReplicationCount()); + if (retVal != 0) { + return retVal; + } + return NumberUtils.compare(getTimestamp(), o.getTimestamp()); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(91, 1011) + .append(getContainerId()) + .toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationReqMsg that = (ReplicationReqMsg) o; + return new EqualsBuilder().append(getContainerId(), that.getContainerId()) + .isEquals(); + } + + public long getContainerId() { + return containerId; + } + + public short getReplicationCount() { + return replicationCount; + } + + public long getTimestamp() { + return timestamp; + } + + public short getExpecReplicationCount() { + return expecReplicationCount; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..7f335e37c1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +/** + * Ozone Container replicaton related classes. + */ \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java new file mode 100644 index 0000000000..39c61d32a0 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.util.Random; +import java.util.UUID; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for ReplicationQueue. + */ +public class TestReplicationQueue { + + private ReplicationQueue replicationQueue; + private Random random; + + @Before + public void setUp() { + replicationQueue = new ReplicationQueue(); + random = new Random(); + } + + @Test + public void testDuplicateAddOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationReqMsg obj1, obj2, obj3; + long time = Time.monotonicNow(); + obj1 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3); + obj2 = new ReplicationReqMsg(contId, (short) 2, time + 1, (short) 3); + obj3 = new ReplicationReqMsg(contId, (short) 1, time+2, (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should add only 1 msg as second one is duplicate", + 1, replicationQueue.size()); + ReplicationReqMsg temp = replicationQueue.poll(); + Assert.assertEquals(temp, obj3); + } + + @Test + public void testPollOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationReqMsg msg1, msg2, msg3, msg4, msg5; + msg1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(), + (short) 3); + long time = Time.monotonicNow(); + msg2 = new ReplicationReqMsg(contId + 1, (short) 4, time, (short) 3); + msg3 = new ReplicationReqMsg(contId + 2, (short) 0, time, (short) 3); + msg4 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3); + // Replication message for same container but different nodeId + msg5 = new ReplicationReqMsg(contId + 1, (short) 2, time, (short) 3); + + replicationQueue.add(msg1); + replicationQueue.add(msg2); + replicationQueue.add(msg3); + replicationQueue.add(msg4); + replicationQueue.add(msg5); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + // Since Priority queue orders messages according to replication count, + // message with lowest replication should be first + ReplicationReqMsg temp; + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + Assert.assertEquals(temp, msg3); + + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + Assert.assertEquals(temp, msg5); + + // Message 2 should be ordered before message 5 as both have same replication + // number but message 2 has earlier timestamp. + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 0 objects", + replicationQueue.size(), 0); + Assert.assertEquals(temp, msg4); + } + + @Test + public void testRemoveOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationReqMsg obj1, obj2, obj3; + obj1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(), + (short) 3); + obj2 = new ReplicationReqMsg(contId + 1, (short) 2, Time.monotonicNow(), + (short) 3); + obj3 = new ReplicationReqMsg(contId + 2, (short) 3, Time.monotonicNow(), + (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + replicationQueue.remove(obj3); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + + replicationQueue.remove(obj2); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + + replicationQueue.remove(obj1); + Assert.assertEquals("Should have 0 objects", + 0, replicationQueue.size()); + } + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..5b1fd0f43a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * SCM Testing and Mocking Utils. + */ +package org.apache.hadoop.ozone.container.replication; +// Test classes for Replication functionality. \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index b563e90e76..9fd30f2ad0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; @@ -80,7 +79,6 @@ public class ContainerMapping implements Mapping { private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; private final LeaseManager containerLeaseManager; - private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; private final ContainerCloser closer; private final long size; @@ -127,9 +125,7 @@ public ContainerMapping( OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); - this.containerSupervisor = - new ContainerSupervisor(conf, nodeManager, - nodeManager.getNodePoolManager()); + this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -407,8 +403,8 @@ public void processContainerReports(DatanodeDetails datanodeDetails, throws IOException { List containerInfos = reports.getReportsList(); - containerSupervisor.handleContainerReport(datanodeDetails, reports); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : + + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); lock.lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java deleted file mode 100644 index 5bd05746bf..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container.replication; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static com.google.common.util.concurrent.Uninterruptibles - .sleepUninterruptibly; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT; - -/** - * This class takes a set of container reports that belong to a pool and then - * computes the replication levels for each container. - */ -public class ContainerSupervisor implements Closeable { - public static final Logger LOG = - LoggerFactory.getLogger(ContainerSupervisor.class); - - private final NodePoolManager poolManager; - private final HashSet poolNames; - private final PriorityQueue poolQueue; - private final NodeManager nodeManager; - private final long containerProcessingLag; - private final AtomicBoolean runnable; - private final ExecutorService executorService; - private final long maxPoolWait; - private long poolProcessCount; - private final List inProgressPoolList; - private final AtomicInteger threadFaultCount; - private final int inProgressPoolMaxCount; - - private final ReadWriteLock inProgressPoolListLock; - - /** - * Returns the number of times we have processed pools. - * @return long - */ - public long getPoolProcessCount() { - return poolProcessCount; - } - - - /** - * Constructs a class that computes Replication Levels. - * - * @param conf - OzoneConfiguration - * @param nodeManager - Node Manager - * @param poolManager - Pool Manager - */ - public ContainerSupervisor(Configuration conf, NodeManager nodeManager, - NodePoolManager poolManager) { - Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(nodeManager); - this.containerProcessingLag = - conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, - OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT, - TimeUnit.SECONDS - ) * 1000; - int maxContainerReportThreads = - conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS, - OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT - ); - this.maxPoolWait = - conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, - OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - this.inProgressPoolMaxCount = conf.getInt( - OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS, - OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT); - this.poolManager = poolManager; - this.nodeManager = nodeManager; - this.poolNames = new HashSet<>(); - this.poolQueue = new PriorityQueue<>(); - this.runnable = new AtomicBoolean(true); - this.threadFaultCount = new AtomicInteger(0); - this.executorService = newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Container Reports Processing Thread - %d") - .build(), maxContainerReportThreads); - this.inProgressPoolList = new LinkedList<>(); - this.inProgressPoolListLock = new ReentrantReadWriteLock(); - - initPoolProcessThread(); - } - - private ExecutorService newCachedThreadPool(ThreadFactory threadFactory, - int maxThreads) { - return new HadoopThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), threadFactory); - } - - /** - * Returns the number of pools that are under process right now. - * @return int - Number of pools that are in process. - */ - public int getInProgressPoolCount() { - return inProgressPoolList.size(); - } - - /** - * Exits the background thread. - */ - public void setExit() { - this.runnable.set(false); - } - - /** - * Adds or removes pools from names that we need to process. - * - * There are two different cases that we need to process. - * The case where some pools are being added and some times we have to - * handle cases where pools are removed. - */ - private void refreshPools() { - List pools = this.poolManager.getNodePools(); - if (pools != null) { - - HashSet removedPools = - computePoolDifference(this.poolNames, new HashSet<>(pools)); - - HashSet addedPools = - computePoolDifference(new HashSet<>(pools), this.poolNames); - // TODO: Support remove pool API in pool manager so that this code - // path can be tested. This never happens in the current code base. - for (String poolName : removedPools) { - for (PeriodicPool periodicPool : poolQueue) { - if (periodicPool.getPoolName().compareTo(poolName) == 0) { - poolQueue.remove(periodicPool); - } - } - } - // Remove the pool names that we have in the list. - this.poolNames.removeAll(removedPools); - - for (String poolName : addedPools) { - poolQueue.add(new PeriodicPool(poolName)); - } - - // Add to the pool names we are tracking. - poolNames.addAll(addedPools); - } - - } - - /** - * Handle the case where pools are added. - * - * @param newPools - New Pools list - * @param oldPool - oldPool List. - */ - private HashSet computePoolDifference(HashSet newPools, - Set oldPool) { - Preconditions.checkNotNull(newPools); - Preconditions.checkNotNull(oldPool); - HashSet newSet = new HashSet<>(newPools); - newSet.removeAll(oldPool); - return newSet; - } - - private void initPoolProcessThread() { - - /* - * Task that runs to check if we need to start a pool processing job. - * if so we create a pool reconciliation job and find out of all the - * expected containers are on the nodes. - */ - Runnable processPools = () -> { - while (runnable.get()) { - // Make sure that we don't have any new pools. - refreshPools(); - while (inProgressPoolList.size() < inProgressPoolMaxCount) { - PeriodicPool pool = poolQueue.poll(); - if (pool != null) { - if (pool.getLastProcessedTime() + this.containerProcessingLag > - Time.monotonicNow()) { - LOG.debug("Not within the time window for processing: {}", - pool.getPoolName()); - // we might over sleep here, not a big deal. - sleepUninterruptibly(this.containerProcessingLag, - TimeUnit.MILLISECONDS); - } - LOG.debug("Adding pool {} to container processing queue", - pool.getPoolName()); - InProgressPool inProgressPool = new InProgressPool(maxPoolWait, - pool, this.nodeManager, this.poolManager, this.executorService); - inProgressPool.startReconciliation(); - inProgressPoolListLock.writeLock().lock(); - try { - inProgressPoolList.add(inProgressPool); - } finally { - inProgressPoolListLock.writeLock().unlock(); - } - poolProcessCount++; - } else { - break; - } - } - sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); - inProgressPoolListLock.readLock().lock(); - try { - for (InProgressPool inProgressPool : inProgressPoolList) { - inProgressPool.finalizeReconciliation(); - poolQueue.add(inProgressPool.getPool()); - } - } finally { - inProgressPoolListLock.readLock().unlock(); - } - inProgressPoolListLock.writeLock().lock(); - try { - inProgressPoolList.clear(); - } finally { - inProgressPoolListLock.writeLock().unlock(); - } - } - }; - - // We will have only one thread for pool processing. - Thread poolProcessThread = new Thread(processPools); - poolProcessThread.setDaemon(true); - poolProcessThread.setName("Pool replica thread"); - poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { - // Let us just restart this thread after logging a critical error. - // if this thread is not running we cannot handle commands from SCM. - LOG.error("Critical Error : Pool replica thread encountered an " + - "error. Thread: {} Error Count : {}", t.toString(), e, - threadFaultCount.incrementAndGet()); - poolProcessThread.start(); - // TODO : Add a config to restrict how many times we will restart this - // thread in a single session. - }); - poolProcessThread.start(); - } - - /** - * Adds a container report to appropriate inProgress Pool. - * @param containerReport -- Container report for a specific container from - * a datanode. - */ - public void handleContainerReport(DatanodeDetails datanodeDetails, - ContainerReportsProto containerReport) { - inProgressPoolListLock.readLock().lock(); - try { - String poolName = poolManager.getNodePool(datanodeDetails); - for (InProgressPool ppool : inProgressPoolList) { - if (ppool.getPoolName().equalsIgnoreCase(poolName)) { - ppool.handleContainerReport(datanodeDetails, containerReport); - return; - } - } - // TODO: Decide if we can do anything else with this report. - LOG.debug("Discarding the container report for pool {}. " + - "That pool is not currently in the pool reconciliation process." + - " Container Name: {}", poolName, datanodeDetails); - } catch (SCMException e) { - LOG.warn("Skipping processing container report from datanode {}, " - + "cause: failed to get the corresponding node pool", - datanodeDetails.toString(), e); - } finally { - inProgressPoolListLock.readLock().unlock(); - } - } - - /** - * Get in process pool list, used for testing. - * @return List of InProgressPool - */ - @VisibleForTesting - public List getInProcessPoolList() { - return inProgressPoolList; - } - - /** - * Shutdown the Container Replication Manager. - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - setExit(); - HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java deleted file mode 100644 index 4b547311da..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container.replication; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -/** - * These are pools that are actively checking for replication status of the - * containers. - */ -public final class InProgressPool { - public static final Logger LOG = - LoggerFactory.getLogger(InProgressPool.class); - - private final PeriodicPool pool; - private final NodeManager nodeManager; - private final NodePoolManager poolManager; - private final ExecutorService executorService; - private final Map containerCountMap; - private final Map processedNodeSet; - private final long startTime; - private ProgressStatus status; - private AtomicInteger nodeCount; - private AtomicInteger nodeProcessed; - private AtomicInteger containerProcessedCount; - private long maxWaitTime; - /** - * Constructs an pool that is being processed. - * @param maxWaitTime - Maximum wait time in milliseconds. - * @param pool - Pool that we are working against - * @param nodeManager - Nodemanager - * @param poolManager - pool manager - * @param executorService - Shared Executor service. - */ - InProgressPool(long maxWaitTime, PeriodicPool pool, - NodeManager nodeManager, NodePoolManager poolManager, - ExecutorService executorService) { - Preconditions.checkNotNull(pool); - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(executorService); - Preconditions.checkArgument(maxWaitTime > 0); - this.pool = pool; - this.nodeManager = nodeManager; - this.poolManager = poolManager; - this.executorService = executorService; - this.containerCountMap = new ConcurrentHashMap<>(); - this.processedNodeSet = new ConcurrentHashMap<>(); - this.maxWaitTime = maxWaitTime; - startTime = Time.monotonicNow(); - } - - /** - * Returns periodic pool. - * - * @return PeriodicPool - */ - public PeriodicPool getPool() { - return pool; - } - - /** - * We are done if we have got reports from all nodes or we have - * done waiting for the specified time. - * - * @return true if we are done, false otherwise. - */ - public boolean isDone() { - return (nodeCount.get() == nodeProcessed.get()) || - (this.startTime + this.maxWaitTime) > Time.monotonicNow(); - } - - /** - * Gets the number of containers processed. - * - * @return int - */ - public int getContainerProcessedCount() { - return containerProcessedCount.get(); - } - - /** - * Returns the start time in milliseconds. - * - * @return - Start Time. - */ - public long getStartTime() { - return startTime; - } - - /** - * Get the number of nodes in this pool. - * - * @return - node count - */ - public int getNodeCount() { - return nodeCount.get(); - } - - /** - * Get the number of nodes that we have already processed container reports - * from. - * - * @return - Processed count. - */ - public int getNodeProcessed() { - return nodeProcessed.get(); - } - - /** - * Returns the current status. - * - * @return Status - */ - public ProgressStatus getStatus() { - return status; - } - - /** - * Starts the reconciliation process for all the nodes in the pool. - */ - public void startReconciliation() { - List datanodeDetailsList = - this.poolManager.getNodes(pool.getPoolName()); - if (datanodeDetailsList.size() == 0) { - LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ", - pool.getPoolName()); - this.status = ProgressStatus.Error; - return; - } - - nodeProcessed = new AtomicInteger(0); - containerProcessedCount = new AtomicInteger(0); - nodeCount = new AtomicInteger(0); - this.status = ProgressStatus.InProgress; - this.getPool().setLastProcessedTime(Time.monotonicNow()); - } - - /** - * Queues a container Report for handling. This is done in a worker thread - * since decoding a container report might be compute intensive . We don't - * want to block since we have asked for bunch of container reports - * from a set of datanodes. - * - * @param containerReport - ContainerReport - */ - public void handleContainerReport(DatanodeDetails datanodeDetails, - ContainerReportsProto containerReport) { - if (status == ProgressStatus.InProgress) { - executorService.submit(processContainerReport(datanodeDetails, - containerReport)); - } else { - LOG.debug("Cannot handle container report when the pool is in {} status.", - status); - } - } - - private Runnable processContainerReport(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) { - return () -> { - if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), - (k) -> true)) { - nodeProcessed.incrementAndGet(); - LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed, - datanodeDetails.getUuid()); - for (ContainerInfo info : reports.getReportsList()) { - containerProcessedCount.incrementAndGet(); - LOG.debug("Total Containers processed: {} Container Name: {}", - containerProcessedCount.get(), info.getContainerID()); - - // Update the container map with count + 1 if the key exists or - // update the map with 1. Since this is a concurrentMap the - // computation and update is atomic. - containerCountMap.merge(info.getContainerID(), 1, Integer::sum); - } - } - }; - } - - /** - * Filter the containers based on specific rules. - * - * @param predicate -- Predicate to filter by - * @return A list of map entries. - */ - public List> filterContainer( - Predicate> predicate) { - return containerCountMap.entrySet().stream() - .filter(predicate).collect(Collectors.toList()); - } - - /** - * Used only for testing, calling this will abort container report - * processing. This is very dangerous call and should not be made by any users - */ - @VisibleForTesting - public void setDoneProcessing() { - nodeProcessed.set(nodeCount.get()); - } - - /** - * Returns the pool name. - * - * @return Name of the pool. - */ - String getPoolName() { - return pool.getPoolName(); - } - - public void finalizeReconciliation() { - status = ProgressStatus.Done; - //TODO: Add finalizing logic. This is where actual reconciliation happens. - } - - /** - * Current status of the computing replication status. - */ - public enum ProgressStatus { - InProgress, Done, Error - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java deleted file mode 100644 index ef28aa78d0..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container.replication; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * Periodic pool is a pool with a time stamp, this allows us to process pools - * based on a cyclic clock. - */ -public class PeriodicPool implements Comparable { - private final String poolName; - private long lastProcessedTime; - private AtomicLong totalProcessedCount; - - /** - * Constructs a periodic pool. - * - * @param poolName - Name of the pool - */ - public PeriodicPool(String poolName) { - this.poolName = poolName; - lastProcessedTime = 0; - totalProcessedCount = new AtomicLong(0); - } - - /** - * Get pool Name. - * @return PoolName - */ - public String getPoolName() { - return poolName; - } - - /** - * Compares this object with the specified object for order. Returns a - * negative integer, zero, or a positive integer as this object is less - * than, equal to, or greater than the specified object. - * - * @param o the object to be compared. - * @return a negative integer, zero, or a positive integer as this object is - * less than, equal to, or greater than the specified object. - * @throws NullPointerException if the specified object is null - * @throws ClassCastException if the specified object's type prevents it - * from being compared to this object. - */ - @Override - public int compareTo(PeriodicPool o) { - return Long.compare(this.lastProcessedTime, o.lastProcessedTime); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PeriodicPool that = (PeriodicPool) o; - - return poolName.equals(that.poolName); - } - - @Override - public int hashCode() { - return poolName.hashCode(); - } - - /** - * Returns the Total Times we have processed this pool. - * - * @return processed count. - */ - public long getTotalProcessedCount() { - return totalProcessedCount.get(); - } - - /** - * Gets the last time we processed this pool. - * @return time in milliseconds - */ - public long getLastProcessedTime() { - return this.lastProcessedTime; - } - - - /** - * Sets the last processed time. - * - * @param lastProcessedTime - Long in milliseconds. - */ - - public void setLastProcessedTime(long lastProcessedTime) { - this.lastProcessedTime = lastProcessedTime; - } - - /* - * Increments the total processed count. - */ - public void incTotalProcessedCount() { - this.totalProcessedCount.incrementAndGet(); - } -} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java deleted file mode 100644 index 7bbe2efe57..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container.replication; -/* - This package contains routines that manage replication of a container. This - relies on container reports to understand the replication level of a - container - UnderReplicated, Replicated, OverReplicated -- and manages the - replication level based on that. - */ \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 4392633b16..72d7e946cc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -123,12 +123,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails); - /** - * Returns the NodePoolManager associated with the NodeManager. - * @return NodePoolManager - */ - NodePoolManager getNodePoolManager(); - /** * Wait for the heartbeat is processed by NodeManager. * @return true if heartbeat has been processed. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java deleted file mode 100644 index 46faf9ca4d..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * Interface that defines SCM NodePoolManager. - */ -public interface NodePoolManager extends Closeable { - - /** - * Add a node to a node pool. - * @param pool - name of the node pool. - * @param node - data node. - */ - void addNode(String pool, DatanodeDetails node) throws IOException; - - /** - * Remove a node from a node pool. - * @param pool - name of the node pool. - * @param node - data node. - * @throws SCMException - */ - void removeNode(String pool, DatanodeDetails node) - throws SCMException; - - /** - * Get a list of known node pools. - * @return a list of known node pool names or an empty list if not node pool - * is defined. - */ - List getNodePools(); - - /** - * Get all nodes of a node pool given the name of the node pool. - * @param pool - name of the node pool. - * @return a list of datanode ids or an empty list if the node pool was not - * found. - */ - List getNodes(String pool); - - /** - * Get the node pool name if the node has been added to a node pool. - * @param datanodeDetails - datanode ID. - * @return node pool name if it has been assigned. - * null if the node has not been assigned to any node pool yet. - */ - String getNodePool(DatanodeDetails datanodeDetails) throws SCMException; -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index fc8b0137f3..adca8eae0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -53,7 +53,6 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import com.google.protobuf.GeneratedMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,7 +158,6 @@ public class SCMNodeManager private ObjectName nmInfoBean; // Node pool manager. - private final SCMNodePoolManager nodePoolManager; private final StorageContainerManager scmManager; public static final Event DATANODE_COMMAND = @@ -210,7 +208,6 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID, registerMXBean(); - this.nodePoolManager = new SCMNodePoolManager(conf); this.scmManager = scmManager; } @@ -682,7 +679,6 @@ private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { @Override public void close() throws IOException { unregisterMXBean(); - nodePoolManager.close(); executorService.shutdown(); try { if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { @@ -760,20 +756,6 @@ public RegisteredCommand register( LOG.info("Leaving startup chill mode."); } - // TODO: define node pool policy for non-default node pool. - // For now, all nodes are added to the "DefaultNodePool" upon registration - // if it has not been added to any node pool yet. - try { - if (nodePoolManager.getNodePool(datanodeDetails) == null) { - nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, - datanodeDetails); - } - } catch (IOException e) { - // TODO: make sure registration failure is handled correctly. - return RegisteredCommand.newBuilder() - .setErrorCode(ErrorCode.errorNodeNotPermitted) - .build(); - } // Updating Node Report, as registration is successful updateNodeStat(datanodeDetails.getUuid(), nodeReport); LOG.info("Data node with ID: {} Registered.", @@ -859,11 +841,6 @@ public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { return new SCMNodeMetric(nodeStats.get(datanodeDetails.getUuid())); } - @Override - public NodePoolManager getNodePoolManager() { - return nodePoolManager; - } - @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java deleted file mode 100644 index faf330ea1d..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_FIND_NODE_IN_POOL; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_LOAD_NODEPOOL; -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; - -/** - * SCM node pool manager that manges node pools. - */ -public final class SCMNodePoolManager implements NodePoolManager { - - private static final Logger LOG = - LoggerFactory.getLogger(SCMNodePoolManager.class); - private static final List EMPTY_NODE_LIST = - new ArrayList<>(); - private static final List EMPTY_NODEPOOL_LIST = new ArrayList<>(); - public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; - - // DB that saves the node to node pool mapping. - private MetadataStore nodePoolStore; - - // In-memory node pool to nodes mapping - private HashMap> nodePools; - - // Read-write lock for nodepool operations - private ReadWriteLock lock; - - /** - * Construct SCMNodePoolManager class that manages node to node pool mapping. - * @param conf - configuration. - * @throws IOException - */ - public SCMNodePoolManager(final OzoneConfiguration conf) - throws IOException { - final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, - OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - File metaDir = getOzoneMetaDirPath(conf); - String scmMetaDataDir = metaDir.getPath(); - File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB); - nodePoolStore = MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(nodePoolDBPath) - .setCacheSize(cacheSize * OzoneConsts.MB) - .build(); - nodePools = new HashMap<>(); - lock = new ReentrantReadWriteLock(); - init(); - } - - /** - * Initialize the in-memory store based on persist store from level db. - * No lock is needed as init() is only invoked by constructor. - * @throws SCMException - */ - private void init() throws SCMException { - try { - nodePoolStore.iterate(null, (key, value) -> { - try { - DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf( - HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key)); - String poolName = DFSUtil.bytes2String(value); - - Set nodePool = null; - if (nodePools.containsKey(poolName)) { - nodePool = nodePools.get(poolName); - } else { - nodePool = new HashSet<>(); - nodePools.put(poolName, nodePool); - } - nodePool.add(nodeId); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding node: {} to node pool: {}", - nodeId, poolName); - } - } catch (IOException e) { - LOG.warn("Can't add a datanode to node pool, continue next..."); - } - return true; - }); - } catch (IOException e) { - LOG.error("Loading node pool error " + e); - throw new SCMException("Failed to load node pool", - FAILED_TO_LOAD_NODEPOOL); - } - } - - /** - * Add a datanode to a node pool. - * @param pool - name of the node pool. - * @param node - name of the datanode. - */ - @Override - public void addNode(final String pool, final DatanodeDetails node) - throws IOException { - Preconditions.checkNotNull(pool, "pool name is null"); - Preconditions.checkNotNull(node, "node is null"); - lock.writeLock().lock(); - try { - // add to the persistent store - nodePoolStore.put(node.getProtoBufMessage().toByteArray(), - DFSUtil.string2Bytes(pool)); - - // add to the in-memory store - Set nodePool = null; - if (nodePools.containsKey(pool)) { - nodePool = nodePools.get(pool); - } else { - nodePool = new HashSet(); - nodePools.put(pool, nodePool); - } - nodePool.add(node); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Remove a datanode from a node pool. - * @param pool - name of the node pool. - * @param node - datanode id. - * @throws SCMException - */ - @Override - public void removeNode(final String pool, final DatanodeDetails node) - throws SCMException { - Preconditions.checkNotNull(pool, "pool name is null"); - Preconditions.checkNotNull(node, "node is null"); - lock.writeLock().lock(); - try { - // Remove from the persistent store - byte[] kName = node.getProtoBufMessage().toByteArray(); - byte[] kData = nodePoolStore.get(kName); - if (kData == null) { - throw new SCMException(String.format("Unable to find node %s from" + - " pool %s in DB.", DFSUtil.bytes2String(kName), pool), - FAILED_TO_FIND_NODE_IN_POOL); - } - nodePoolStore.delete(kName); - - // Remove from the in-memory store - if (nodePools.containsKey(pool)) { - Set nodePool = nodePools.get(pool); - nodePool.remove(node); - } else { - throw new SCMException(String.format("Unable to find node %s from" + - " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), - FAILED_TO_FIND_NODE_IN_POOL); - } - } catch (IOException e) { - throw new SCMException("Failed to remove node " + node.toString() - + " from node pool " + pool, e, - SCMException.ResultCodes.IO_EXCEPTION); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Get all the node pools. - * @return all the node pools. - */ - @Override - public List getNodePools() { - lock.readLock().lock(); - try { - if (!nodePools.isEmpty()) { - return nodePools.keySet().stream().collect(Collectors.toList()); - } else { - return EMPTY_NODEPOOL_LIST; - } - } finally { - lock.readLock().unlock(); - } - } - - /** - * Get all datanodes of a specific node pool. - * @param pool - name of the node pool. - * @return all datanodes of the specified node pool. - */ - @Override - public List getNodes(final String pool) { - Preconditions.checkNotNull(pool, "pool name is null"); - if (nodePools.containsKey(pool)) { - return nodePools.get(pool).stream().collect(Collectors.toList()); - } else { - return EMPTY_NODE_LIST; - } - } - - /** - * Get the node pool name if the node has been added to a node pool. - * @param datanodeDetails - datanode ID. - * @return node pool name if it has been assigned. - * null if the node has not been assigned to any node pool yet. - * TODO: Put this in a in-memory map if performance is an issue. - */ - @Override - public String getNodePool(final DatanodeDetails datanodeDetails) - throws SCMException { - Preconditions.checkNotNull(datanodeDetails, "node is null"); - try { - byte[] result = nodePoolStore.get( - datanodeDetails.getProtoBufMessage().toByteArray()); - return result == null ? null : DFSUtil.bytes2String(result); - } catch (IOException e) { - throw new SCMException("Failed to get node pool for node " - + datanodeDetails.toString(), e, - SCMException.ResultCodes.IO_EXCEPTION); - } - } - - /** - * Close node pool level db store. - * @throws IOException - */ - @Override - public void close() throws IOException { - nodePoolStore.close(); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 8c59462b40..80b5d6e182 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -273,11 +272,6 @@ public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid())); } - @Override - public NodePoolManager getNodePoolManager() { - return Mockito.mock(NodePoolManager.class); - } - /** * Used for testing. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java deleted file mode 100644 index 8f412dedda..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.commons.collections.ListUtils; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms - .SCMContainerPlacementCapacity; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.test.PathUtils; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test for SCM node pool manager. - */ -public class TestSCMNodePoolManager { - private static final Logger LOG = - LoggerFactory.getLogger(TestSCMNodePoolManager.class); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - private final File testDir = PathUtils.getTestDir( - TestSCMNodePoolManager.class); - - SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf) - throws IOException { - conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, - testDir.getAbsolutePath()); - conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); - return new SCMNodePoolManager(conf); - } - - /** - * Test default node pool. - * - * @throws IOException - */ - @Test - public void testDefaultNodePool() throws IOException { - OzoneConfiguration conf = new OzoneConfiguration(); - try { - final String defaultPool = "DefaultPool"; - NodePoolManager npMgr = createNodePoolManager(conf); - - final int nodeCount = 4; - final List nodes = TestUtils - .getListOfDatanodeDetails(nodeCount); - assertEquals(0, npMgr.getNodePools().size()); - for (DatanodeDetails node: nodes) { - npMgr.addNode(defaultPool, node); - } - List nodesRetrieved = npMgr.getNodes(defaultPool); - assertEquals(nodeCount, nodesRetrieved.size()); - assertTwoDatanodeListsEqual(nodes, nodesRetrieved); - - DatanodeDetails nodeRemoved = nodes.remove(2); - npMgr.removeNode(defaultPool, nodeRemoved); - List nodesAfterRemove = npMgr.getNodes(defaultPool); - assertTwoDatanodeListsEqual(nodes, nodesAfterRemove); - - List nonExistSet = npMgr.getNodes("NonExistSet"); - assertEquals(0, nonExistSet.size()); - } finally { - FileUtil.fullyDelete(testDir); - } - } - - - /** - * Test default node pool reload. - * - * @throws IOException - */ - @Test - public void testDefaultNodePoolReload() throws IOException { - OzoneConfiguration conf = new OzoneConfiguration(); - final String defaultPool = "DefaultPool"; - final int nodeCount = 4; - final List nodes = TestUtils - .getListOfDatanodeDetails(nodeCount); - - try { - try { - SCMNodePoolManager npMgr = createNodePoolManager(conf); - assertEquals(0, npMgr.getNodePools().size()); - for (DatanodeDetails node : nodes) { - npMgr.addNode(defaultPool, node); - } - List nodesRetrieved = npMgr.getNodes(defaultPool); - assertEquals(nodeCount, nodesRetrieved.size()); - assertTwoDatanodeListsEqual(nodes, nodesRetrieved); - npMgr.close(); - } finally { - LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" + - " and close."); - } - - // try reload with a new NodePoolManager instance - try { - SCMNodePoolManager npMgr = createNodePoolManager(conf); - List nodesRetrieved = npMgr.getNodes(defaultPool); - assertEquals(nodeCount, nodesRetrieved.size()); - assertTwoDatanodeListsEqual(nodes, nodesRetrieved); - } finally { - LOG.info("testDefaultNodePoolReload: Finish reloading node pool."); - } - } finally { - FileUtil.fullyDelete(testDir); - } - } - - /** - * Compare and verify that two datanode lists are equal. - * @param list1 - datanode list 1. - * @param list2 - datanode list 2. - */ - private void assertTwoDatanodeListsEqual(List list1, - List list2) { - assertEquals(list1.size(), list2.size()); - Collections.sort(list1); - Collections.sort(list2); - assertTrue(ListUtils.isEqualList(list1, list2)); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 072d821247..1a4dcd7ad2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; @@ -201,10 +200,6 @@ public SCMNodeMetric getNodeStat(DatanodeDetails dd) { return null; } - @Override - public NodePoolManager getNodePoolManager() { - return Mockito.mock(NodePoolManager.class); - } /** * Wait for the heartbeat is processed by NodeManager. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java deleted file mode 100644 index ffcd752e84..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.testutils; - -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.NodePoolManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Pool Manager replication mock. - */ -public class ReplicationNodePoolManagerMock implements NodePoolManager { - - private final Map nodeMemberShip; - - /** - * A node pool manager for testing. - */ - public ReplicationNodePoolManagerMock() { - nodeMemberShip = new HashMap<>(); - } - - /** - * Add a node to a node pool. - * - * @param pool - name of the node pool. - * @param node - data node. - */ - @Override - public void addNode(String pool, DatanodeDetails node) { - nodeMemberShip.put(node, pool); - } - - /** - * Remove a node from a node pool. - * - * @param pool - name of the node pool. - * @param node - data node. - * @throws SCMException - */ - @Override - public void removeNode(String pool, DatanodeDetails node) - throws SCMException { - nodeMemberShip.remove(node); - - } - - /** - * Get a list of known node pools. - * - * @return a list of known node pool names or an empty list if not node pool - * is defined. - */ - @Override - public List getNodePools() { - Set poolSet = new HashSet<>(); - for (Map.Entry entry : nodeMemberShip.entrySet()) { - poolSet.add(entry.getValue()); - } - return new ArrayList<>(poolSet); - - } - - /** - * Get all nodes of a node pool given the name of the node pool. - * - * @param pool - name of the node pool. - * @return a list of datanode ids or an empty list if the node pool was not - * found. - */ - @Override - public List getNodes(String pool) { - Set datanodeSet = new HashSet<>(); - for (Map.Entry entry : nodeMemberShip.entrySet()) { - if (entry.getValue().equals(pool)) { - datanodeSet.add(entry.getKey()); - } - } - return new ArrayList<>(datanodeSet); - } - - /** - * Get the node pool name if the node has been added to a node pool. - * - * @param datanodeDetails DatanodeDetails. - * @return node pool name if it has been assigned. null if the node has not - * been assigned to any node pool yet. - */ - @Override - public String getNodePool(DatanodeDetails datanodeDetails) { - return nodeMemberShip.get(datanodeDetails); - } - - /** - * Closes this stream and releases any system resources associated - * with it. If the stream is already closed then invoking this - * method has no effect. - *

- *

As noted in {@link AutoCloseable#close()}, cases where the - * close may fail require careful attention. It is strongly advised - * to relinquish the underlying resources and to internally - * mark the {@code Closeable} as closed, prior to throwing - * the {@code IOException}. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java index 4d70af84a2..b4ed2b12c2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java @@ -51,12 +51,9 @@ import java.util.HashMap; import java.util.UUID; -import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; import static org.apache.hadoop.ozone.OzoneConsts.KB; -import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * This class tests the CLI that transforms container into SQLite DB files. @@ -176,34 +173,6 @@ public void shutdown() throws InterruptedException { } } - @Test - public void testConvertNodepoolDB() throws Exception { - String dbOutPath = GenericTestUtils.getTempPath( - UUID.randomUUID() + "/out_sql.db"); - String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS); - String dbPath = dbRootPath + "/" + NODEPOOL_DB; - String[] args = {"-p", dbPath, "-o", dbOutPath}; - - cli.run(args); - - // verify the sqlite db - HashMap expectedPool = new HashMap<>(); - for (DatanodeDetails dnid : nodeManager.getAllNodes()) { - expectedPool.put(dnid.getUuidString(), "DefaultNodePool"); - } - Connection conn = connectDB(dbOutPath); - String sql = "SELECT * FROM nodePool"; - ResultSet rs = executeQuery(conn, sql); - while(rs.next()) { - String datanodeUUID = rs.getString("datanodeUUID"); - String poolName = rs.getString("poolName"); - assertTrue(expectedPool.remove(datanodeUUID).equals(poolName)); - } - assertEquals(0, expectedPool.size()); - - Files.delete(Paths.get(dbOutPath)); - } - @Test public void testConvertContainerDB() throws Exception { String dbOutPath = GenericTestUtils.getTempPath( diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index 2bd43fb93a..edc0d7b597 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -19,7 +19,6 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; -import com.google.protobuf.ByteString; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -60,13 +59,11 @@ import java.util.HashSet; import java.util.Set; -import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX; import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.KSM_USER_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.KSM_BUCKET_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.KSM_VOLUME_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB; /** @@ -111,15 +108,6 @@ public class SQLCLI extends Configured implements Tool { private static final String INSERT_CONTAINER_MEMBERS = "INSERT INTO containerMembers (containerName, datanodeUUID) " + "VALUES (\"%s\", \"%s\")"; - // for nodepool.db - private static final String CREATE_NODE_POOL = - "CREATE TABLE nodePool (" + - "datanodeUUID TEXT NOT NULL," + - "poolName TEXT NOT NULL," + - "PRIMARY KEY(datanodeUUID, poolName))"; - private static final String INSERT_NODE_POOL = - "INSERT INTO nodePool (datanodeUUID, poolName) " + - "VALUES (\"%s\", \"%s\")"; // and reuse CREATE_DATANODE_INFO and INSERT_DATANODE_INFO // for openContainer.db private static final String CREATE_OPEN_CONTAINER = @@ -285,9 +273,6 @@ public int run(String[] args) throws Exception { if (dbName.toString().endsWith(CONTAINER_DB_SUFFIX)) { LOG.info("Converting container DB"); convertContainerDB(dbPath, outPath); - } else if (dbName.toString().equals(NODEPOOL_DB)) { - LOG.info("Converting node pool DB"); - convertNodePoolDB(dbPath, outPath); } else if (dbName.toString().equals(OPEN_CONTAINERS_DB)) { LOG.info("Converting open container DB"); convertOpenContainerDB(dbPath, outPath); @@ -543,66 +528,7 @@ private void insertContainerDB(Connection conn, long containerID, } LOG.info("Insertion completed."); } - /** - * Converts nodePool.db to sqlite. The schema of sql db: - * two tables, nodePool and datanodeInfo (the same datanode Info as for - * container.db). - * - * nodePool - * --------------------------------------------------------- - * datanodeUUID* | poolName* - * --------------------------------------------------------- - * - * datanodeInfo: - * --------------------------------------------------------- - * hostname | datanodeUUid* | xferPort | ipcPort - * --------------------------------------------------------- - * - * -------------------------------- - * |containerPort - * -------------------------------- - * - * @param dbPath path to container db. - * @param outPath path to output sqlite - * @throws IOException throws exception. - */ - private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception { - LOG.info("Create table for sql node pool db."); - File dbFile = dbPath.toFile(); - try (MetadataStore dbStore = MetadataStoreBuilder.newBuilder() - .setConf(conf).setDbFile(dbFile).build(); - Connection conn = connectDB(outPath.toString())) { - executeSQL(conn, CREATE_NODE_POOL); - executeSQL(conn, CREATE_DATANODE_INFO); - dbStore.iterate(null, (key, value) -> { - DatanodeDetails nodeId = DatanodeDetails - .getFromProtoBuf(HddsProtos.DatanodeDetailsProto - .PARSER.parseFrom(key)); - String blockPool = DFSUtil.bytes2String(value); - try { - insertNodePoolDB(conn, blockPool, nodeId); - return true; - } catch (SQLException e) { - throw new IOException(e); - } - }); - } - } - - private void insertNodePoolDB(Connection conn, String blockPool, - DatanodeDetails datanodeDetails) throws SQLException { - String insertNodePool = String.format(INSERT_NODE_POOL, - datanodeDetails.getUuidString(), blockPool); - executeSQL(conn, insertNodePool); - - String insertDatanodeDetails = String - .format(INSERT_DATANODE_INFO, datanodeDetails.getHostName(), - datanodeDetails.getUuidString(), datanodeDetails.getIpAddress(), - datanodeDetails.getPort(DatanodeDetails.Port.Name.STANDALONE) - .getValue()); - executeSQL(conn, insertDatanodeDetails); - } /** * Convert openContainer.db to sqlite db file. This is rather simple db,