From 0cc166c053dfe1af3372591ca6151e0e3b4741ea Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Tue, 25 Jul 2017 13:28:33 -0700 Subject: [PATCH] HDFS-12115. Ozone: SCM: Add queryNode RPC Call. Contributed by Anu Engineer. --- .../scm/client/ContainerOperationClient.java | 19 +++ .../apache/hadoop/scm/client/ScmClient.java | 13 ++ .../StorageContainerLocationProtocol.java | 11 ++ ...ocationProtocolClientSideTranslatorPB.java | 31 ++++ .../src/main/proto/Ozone.proto | 32 +++- .../StorageContainerLocationProtocol.proto | 32 ++++ ...ocationProtocolServerSideTranslatorPB.java | 23 +++ .../ozone/scm/StorageContainerManager.java | 142 ++++++++++++++---- .../placement/algorithms/SCMCommonPolicy.java | 3 +- .../container/replication/InProgressPool.java | 22 ++- .../hadoop/ozone/scm/node/NodeManager.java | 23 +-- .../hadoop/ozone/scm/node/SCMNodeManager.java | 23 +-- .../hadoop/cblock/util/MockStorageClient.java | 18 +++ .../apache/hadoop/ozone/MiniOzoneCluster.java | 5 +- .../ReplicationDatanodeStateManager.java | 4 +- .../TestUtils/ReplicationNodeManagerMock.java | 13 +- .../placement/TestContainerPlacement.java | 4 +- .../TestContainerReplicationManager.java | 9 +- .../ozone/scm/container/MockNodeManager.java | 92 ++++++++++-- .../scm/node/TestContainerPlacement.java | 3 +- .../ozone/scm/node/TestNodeManager.java | 24 +-- .../hadoop/ozone/scm/node/TestQueryNode.java | 124 +++++++++++++++ 22 files changed, 563 insertions(+), 107 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java index 481732a0f6..5ee70bc08d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.XceiverClientManager; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.UUID; @@ -130,6 +132,23 @@ public Pipeline createContainer(String containerId, } } + /** + * Returns a set of Nodes that meet a query criteria. + * + * @param nodeStatuses - A set of criteria that we want the node to have. + * @param queryScope - Query scope - Cluster or pool. + * @param poolName - if it is pool, a pool name is required. + * @return A set of nodes that meet the requested criteria. + * @throws IOException + */ + @Override + public OzoneProtos.NodePool queryNode(EnumSet + nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) + throws IOException { + return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, + poolName); + } + /** * Delete the container, this will release any resource it uses. * @param pipeline - Pipeline that represents the container. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java index 22180b16b0..c651a8bb67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java @@ -20,8 +20,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import java.io.IOException; +import java.util.EnumSet; import java.util.List; /** @@ -128,4 +130,15 @@ public static ReplicationFactor parseReplicationFactor(int i) { */ Pipeline createContainer(String containerId, ReplicationFactor replicationFactor) throws IOException; + + /** + * Returns a set of Nodes that meet a query criteria. + * @param nodeStatuses - A set of criteria that we want the node to have. + * @param queryScope - Query scope - Cluster or pool. + * @param poolName - if it is pool, a pool name is required. + * @return A set of nodes that meet the requested criteria. + * @throws IOException + */ + OzoneProtos.NodePool queryNode(EnumSet nodeStatuses, + OzoneProtos.QueryScope queryScope, String poolName) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java index 485a4d2801..6bb5800070 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java @@ -19,10 +19,12 @@ package org.apache.hadoop.scm.protocol; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; /** * ContainerLocationProtocol is used by an HDFS node to find the set of nodes @@ -88,4 +90,13 @@ List listContainer(String startName, String prefixName, int count) * or container doesn't exist. */ void deleteContainer(String containerName) throws IOException; + + /** + * Queries a list of Node Statuses. + * @param nodeStatuses + * @return List of Datanodes. + */ + OzoneProtos.NodePool queryNode(EnumSet nodeStatuses, + OzoneProtos.QueryScope queryScope, String poolName) throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 7e059de020..7ec4a8675f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -35,11 +35,15 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto; + import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; /** @@ -186,6 +190,33 @@ public void deleteContainer(String containerName) } } + /** + * Queries a list of Node Statuses. + * + * @param nodeStatuses + * @return List of Datanodes. + */ + @Override + public OzoneProtos.NodePool queryNode(EnumSet + nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) + throws IOException { + // TODO : We support only cluster wide query right now. So ignoring checking + // queryScope and poolName + Preconditions.checkNotNull(nodeStatuses); + Preconditions.checkState(nodeStatuses.size() > 0); + NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() + .addAllQuery(nodeStatuses) + .setScope(queryScope).setPoolName(poolName).build(); + try { + NodeQueryResponseProto response = + rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); + return response.getDatanodes(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto index 740038ae9f..ede6ea96ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto @@ -40,4 +40,34 @@ message Pipeline { message KeyValue { required string key = 1; optional string value = 2; -} \ No newline at end of file +} + +/** + * Enum that represents the Node State. This is used in calls to getNodeList + * and getNodeCount. + */ +enum NodeState { + HEALTHY = 1; + STALE = 2; + DEAD = 3; + DECOMMISSIONING = 4; + DECOMMISSIONED = 5; + RAFT_MEMBER = 6; + FREE_NODE = 7; // Not a member in raft. + UNKNOWN = 8; +} + +enum QueryScope { + CLUSTER = 1; + POOL = 2; +} + +message Node { + required DatanodeIDProto nodeID = 1; + repeated NodeState nodeStates = 2; +} + +message NodePool { + repeated Node nodes = 1; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto index 47eb9871ef..6c163479b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto @@ -84,6 +84,33 @@ message DeleteContainerResponseProto { // Empty response } +/* + NodeQueryRequest sends a request to SCM asking to send a list of nodes that + match the NodeState that we are requesting. +*/ +message NodeQueryRequestProto { + + + // Repeated, So we can specify more than one status type. + // These NodeState types are additive for now, in the sense that + // if you specify HEALTHY and FREE_NODE members -- + // Then you get all healthy node which are not raft members. + // + // if you specify all healthy and dead nodes, you will get nothing + // back. Server is not going to dictate what combinations make sense, + // it is entirely up to the caller. + // TODO: Support operators like OR and NOT. Currently it is always an + // implied AND. + + repeated hadoop.hdfs.ozone.NodeState query = 1; + required hadoop.hdfs.ozone.QueryScope scope = 2; + optional string poolName = 3; // if scope is pool, then pool name is needed. +} + +message NodeQueryResponseProto { + required hadoop.hdfs.ozone.NodePool datanodes = 1; +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request @@ -107,4 +134,9 @@ service StorageContainerLocationProtocolService { * Deletes a container in SCM. */ rpc deleteContainer(DeleteContainerRequestProto) returns (DeleteContainerResponseProto); + + /** + * Returns a set of Nodes that meet a criteria. + */ + rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index bf1281ed80..e45afb9763 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -18,12 +18,16 @@ package org.apache.hadoop.ozone.protocolPB; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import static org.apache.hadoop.ozone.protocol.proto @@ -138,4 +142,23 @@ public DeleteContainerResponseProto deleteContainer( throw new ServiceException(e); } } + + @Override + public StorageContainerLocationProtocolProtos.NodeQueryResponseProto + queryNode(RpcController controller, + StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) + throws ServiceException { + try { + EnumSet nodeStateEnumSet = EnumSet.copyOf(request + .getQueryList()); + OzoneProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet, + request.getScope(), request.getPoolName()); + return StorageContainerLocationProtocolProtos + .NodeQueryResponseProto.newBuilder() + .setDatanodes(datanodes) + .build(); + } catch (Exception e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 4c10387ed5..409b182846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -31,20 +31,12 @@ import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos; -import org.apache.hadoop.ozone.protocolPB - .ScmBlockLocationProtocolServerSideTranslatorPB; -import org.apache.hadoop.ozone.scm.block.BlockManager; -import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; -import org.apache.hadoop.ozone.scm.exceptions.SCMException; -import org.apache.hadoop.scm.client.ScmClient; -import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; -import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; -import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto @@ -61,6 +53,8 @@ .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto @@ -71,20 +65,28 @@ .StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; +import org.apache.hadoop.ozone.protocolPB + .ScmBlockLocationProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB .StorageContainerDatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.protocolPB .StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.scm.block.BlockManager; +import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; import org.apache.hadoop.ozone.scm.container.ContainerMapping; import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.scm.client.ScmClient; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -93,32 +95,30 @@ import javax.management.ObjectName; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Collection; +import java.util.TreeSet; import java.util.UUID; import static org.apache.hadoop.ozone.protocol.proto .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; - import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; /** @@ -134,7 +134,7 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) public class StorageContainerManager implements StorageContainerDatanodeProtocol, - StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean{ + StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean { private static final Logger LOG = LoggerFactory.getLogger(StorageContainerManager.class); @@ -420,6 +420,96 @@ public void deleteContainer(String containerName) throws IOException { scmContainerManager.deleteContainer(containerName); } + /** + * Queries a list of Node Statuses. + * + * @param nodeStatuses + * @param queryScope + * @param poolName @return List of Datanodes. + */ + @Override + public OzoneProtos.NodePool queryNode(EnumSet nodeStatuses, + OzoneProtos.QueryScope queryScope, String poolName) throws IOException { + + if (queryScope == OzoneProtos.QueryScope.POOL) { + throw new IllegalArgumentException("Not Supported yet"); + } + + List datanodes = queryNode(nodeStatuses); + OzoneProtos.NodePool.Builder poolBuilder = + OzoneProtos.NodePool.newBuilder(); + + for (DatanodeID datanode : datanodes) { + OzoneProtos.Node node = OzoneProtos.Node.newBuilder() + .setNodeID(datanode.getProtoBufMessage()) + .addAllNodeStates(nodeStatuses) + .build(); + poolBuilder.addNodes(node); + } + + return poolBuilder.build(); + } + + /** + * Queries a list of Node that match a set of statuses. + *

+ * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, + * then this call will return all healthy nodes which members in + * Raft pipeline. + *

+ * Right now we don't support operations, so we assume it is an AND operation + * between the operators. + * + * @param nodeStatuses - A set of NodeStates. + * @return List of Datanodes. + */ + + public List queryNode(EnumSet nodeStatuses) { + Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null"); + Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " + + "in the query set"); + List resultList = new LinkedList<>(); + Set currentSet = new TreeSet<>(); + + for (NodeState nodeState : nodeStatuses) { + Set nextSet = queryNodeState(nodeState); + if ((nextSet == null) || (nextSet.size() == 0)) { + // Right now we only support AND operation. So intersect with + // any empty set is null. + return resultList; + } + // First time we have to add all the elements, next time we have to + // do an intersection operation on the set. + if (currentSet.size() == 0) { + currentSet.addAll(nextSet); + } else { + currentSet.retainAll(nextSet); + } + } + + resultList.addAll(currentSet); + return resultList; + } + + /** + * Query the System for Nodes. + * + * @param nodeState - NodeState that we are interested in matching. + * @return Set of Datanodes that match the NodeState. + */ + private Set queryNodeState(NodeState nodeState) { + if (nodeState == NodeState.RAFT_MEMBER || + nodeState == NodeState.FREE_NODE) { + throw new IllegalStateException("Not implemented yet"); + } + Set returnSet = new TreeSet<>(); + List tmp = getScmNodeManager().getNodes(nodeState); + if ((tmp != null) && (tmp.size() > 0)) { + returnSet.addAll(tmp); + } + return returnSet; + } + /** * Asks SCM where a container should be allocated. SCM responds with the set * of datanodes that should be used creating this container. @@ -610,14 +700,14 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, * @param nodestate Healthy, Dead etc. * @return int -- count */ - public int getNodeCount(SCMNodeManager.NODESTATE nodestate) { + public int getNodeCount(NodeState nodestate) { return scmNodeManager.getNodeCount(nodestate); } @Override public Map getNodeCount() { Map countMap = new HashMap(); - for (SCMNodeManager.NODESTATE state : SCMNodeManager.NODESTATE.values()) { + for (NodeState state : NodeState.values()) { countMap.put(state.toString(), scmNodeManager.getNodeCount(state)); } return countMap; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java index 36712ef10a..b5a1a139d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -103,7 +104,7 @@ public Configuration getConf() { public List chooseDatanodes(int nodesRequired, final long sizeRequired) throws SCMException { List healthyNodes = - nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY); + nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY); String msg; if (healthyNodes.size() == 0) { msg = "No healthy node found to allocate container."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java index 87629e1307..d3dcc01d1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java @@ -20,10 +20,9 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; @@ -40,11 +39,10 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static com.google.common.util.concurrent.Uninterruptibles - .sleepUninterruptibly; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.UNKNOWN; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN; /** * These are pools that are actively checking for replication status of the @@ -183,7 +181,7 @@ public void startReconciliation() { */ SendContainerCommand cmd = SendContainerCommand.newBuilder().build(); for (DatanodeID id : datanodeIDList) { - NodeManager.NODESTATE currentState = getNodestate(id); + NodeState currentState = getNodestate(id); if (currentState == HEALTHY || currentState == STALE) { nodeCount.incrementAndGet(); // Queue commands to all datanodes in this pool to send us container @@ -202,8 +200,8 @@ public void startReconciliation() { * @param id - datanode ID. * @return NodeState. */ - private NodeManager.NODESTATE getNodestate(DatanodeID id) { - NodeManager.NODESTATE currentState = UNKNOWN; + private NodeState getNodestate(DatanodeID id) { + NodeState currentState = UNKNOWN; int maxTry = 100; // We need to loop to make sure that we will retry if we get // node state unknown. This can lead to infinite loop if we send diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 3670f56e54..c21e62c8f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; @@ -62,18 +63,17 @@ public interface NodeManager extends StorageContainerNodeProtocol, /** * Gets all Live Datanodes that is currently communicating with SCM. - * @param nodestate - State of the node + * @param nodeState - State of the node * @return List of Datanodes that are Heartbeating SCM. */ - - List getNodes(NODESTATE nodestate); + List getNodes(NodeState nodeState); /** * Returns the Number of Datanodes that are communicating with SCM. - * @param nodestate - State of the node + * @param nodeState - State of the node * @return int -- count */ - int getNodeCount(NODESTATE nodestate); + int getNodeCount(NodeState nodeState); /** * Get all datanodes known to SCM. @@ -102,17 +102,6 @@ public interface NodeManager extends StorageContainerNodeProtocol, */ void clearChillModeFlag(); - /** - * Enum that represents the Node State. This is used in calls to getNodeList - * and getNodeCount. TODO: Add decommission when we support it. - */ - enum NODESTATE { - HEALTHY, - STALE, - DEAD, - UNKNOWN - } - /** * Returns the aggregated node stats. * @return the aggregated node stats. @@ -144,5 +133,5 @@ enum NODESTATE { * @param id - DatanodeID * @return Healthy/Stale/Dead. */ - NODESTATE getNodeState(DatanodeID id); + NodeState getNodeState(DatanodeID id); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 235809b4a3..dc7db2df02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto .ErrorCode; @@ -64,6 +65,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN; import static org.apache.hadoop.util.Time.monotonicNow; /** @@ -212,7 +217,7 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException { * @return List of Datanodes that are known to SCM in the requested state. */ @Override - public List getNodes(NODESTATE nodestate) + public List getNodes(NodeState nodestate) throws IllegalArgumentException { Map set; switch (nodestate) { @@ -368,7 +373,7 @@ public void forceEnterChillMode() { * @return int -- count */ @Override - public int getNodeCount(NODESTATE nodestate) { + public int getNodeCount(NodeState nodestate) { switch (nodestate) { case HEALTHY: return healthyNodeCount.get(); @@ -383,7 +388,7 @@ public int getNodeCount(NODESTATE nodestate) { // that this information might not be consistent always. return 0; default: - throw new IllegalArgumentException("Unknown node state requested."); + return 0; } } @@ -405,7 +410,7 @@ public boolean waitForHeartbeatProcessed() { * @return Healthy/Stale/Dead/Unknown. */ @Override - public NODESTATE getNodeState(DatanodeID id) { + public NodeState getNodeState(DatanodeID id) { // There is a subtle race condition here, hence we also support // the NODEState.UNKNOWN. It is possible that just before we check the // healthyNodes, we have removed the node from the healthy list but stil @@ -415,18 +420,18 @@ public NODESTATE getNodeState(DatanodeID id) { // just deal with the possibilty of getting a state called unknown. if(healthyNodes.containsKey(id.getDatanodeUuid())) { - return NODESTATE.HEALTHY; + return HEALTHY; } if(staleNodes.containsKey(id.getDatanodeUuid())) { - return NODESTATE.STALE; + return STALE; } if(deadNodes.containsKey(id.getDatanodeUuid())) { - return NODESTATE.DEAD; + return DEAD; } - return NODESTATE.UNKNOWN; + return UNKNOWN; } /** @@ -826,7 +831,7 @@ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); - for(NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) { + for(NodeState state : NodeState.values()) { nodeCountMap.put(state.toString(), getNodeCount(state)); } return nodeCountMap; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java index ff5b0af57c..25571a59e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java @@ -19,11 +19,13 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; /** @@ -121,4 +123,20 @@ public Pipeline createContainer(String containerId, ContainerLookUpService.addContainer(Long.toString(currentContainerId)); return ContainerLookUpService.lookUp(Long.toString(currentContainerId)) .getPipeline(); } + + /** + * Returns a set of Nodes that meet a query criteria. + * + * @param nodeStatuses - A set of criteria that we want the node to have. + * @param queryScope - Query scope - Cluster or pool. + * @param poolName - if it is pool, a pool name is required. + * @return A set of nodes that meet the requested criteria. + * @throws IOException + */ + @Override + public OzoneProtos.NodePool queryNode(EnumSet + nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName) + throws IOException { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index a0dd6bddfd..f2a8be4835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -37,7 +37,6 @@ .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.scm.StorageContainerManager; -import org.apache.hadoop.ozone.scm.node.SCMNodeManager; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -56,6 +55,8 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; import static org.junit.Assert.assertFalse; /** @@ -236,7 +237,7 @@ public OzoneRestClient createOzoneRestClient() throws OzoneException { */ public void waitOzoneReady() throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { - final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY); + final int healthy = scm.getNodeCount(HEALTHY); final boolean isReady = healthy >= numDataNodes; LOG.info("{}. Got {} of {} DN Heartbeats.", isReady? "Cluster is ready" : "Waiting for cluster to be ready", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java index d0f440f0a4..3f7a4abe88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Random; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; + /** * This class manages the state of datanode * in conjunction with the node pool and node managers. @@ -74,7 +76,7 @@ public List getContainerReport(String containerName, DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size())); nodesInPool.remove(id); // We return container reports only for nodes that are healthy. - if (nodeManager.getNodeState(id) == NodeManager.NODESTATE.HEALTHY) { + if (nodeManager.getNodeState(id) == HEALTHY) { ContainerInfo info = ContainerInfo.newBuilder() .setContainerName(containerName) .setFinalhash(DigestUtils.sha256Hex(containerName)) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java index e432c26908..8d3da99cff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java @@ -32,18 +32,19 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; /** * A Node Manager to test replication. */ public class ReplicationNodeManagerMock implements NodeManager { - private final Map nodeStateMap; + private final Map nodeStateMap; /** * A list of Datanodes and current states. * @param nodeState A node state map. */ - public ReplicationNodeManagerMock(Map nodeState) { + public ReplicationNodeManagerMock(Map nodeState) { Preconditions.checkNotNull(nodeState); nodeStateMap = nodeState; } @@ -118,7 +119,7 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException { * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List getNodes(NODESTATE nodestate) { + public List getNodes(NodeState nodestate) { return null; } @@ -129,7 +130,7 @@ public List getNodes(NODESTATE nodestate) { * @return int -- count */ @Override - public int getNodeCount(NODESTATE nodestate) { + public int getNodeCount(NodeState nodestate) { return 0; } @@ -220,7 +221,7 @@ public boolean waitForHeartbeatProcessed() { * @return Healthy/Stale/Dead. */ @Override - public NODESTATE getNodeState(DatanodeID id) { + public NodeState getNodeState(DatanodeID id) { return nodeStateMap.get(id); } @@ -308,7 +309,7 @@ public void clearMap() { * @param id - DatanodeID * @param state State you want to put that node to. */ - public void addNode(DatanodeID id, NODESTATE state) { + public void addNode(DatanodeID id, NodeState state) { nodeStateMap.put(id, state); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index d798c61845..64e012d899 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Random; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; import static org.junit.Assert.assertEquals; /** @@ -40,7 +42,7 @@ public class TestContainerPlacement { private DescriptiveStatistics computeStatistics(NodeManager nodeManager) { DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics(); - for (DatanodeID id : nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY)) { + for (DatanodeID id : nodeManager.getNodes(HEALTHY)) { float weightedValue = nodeManager.getNodeStat(id).get().getScmUsed().get() / (float) nodeManager.getNodeStat(id).get().getCapacity().get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java index 3f3aba9def..b631464f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.container.TestUtils .ReplicationNodePoolManagerMock; import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.ozone.scm.container.replication @@ -49,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS; import static org.apache.ratis.shaded.com.google.common.util.concurrent @@ -80,13 +82,13 @@ public void tearDown() throws Exception { @Before public void setUp() throws Exception { GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG); - Map nodeStateMap = new HashMap<>(); + Map nodeStateMap = new HashMap<>(); // We are setting up 3 pools with 24 nodes each in this cluster. // First we create 72 Datanodes. for (int x = 0; x < MAX_DATANODES; x++) { DatanodeID datanode = SCMTestUtils.getDatanodeID(); datanodes.add(datanode); - nodeStateMap.put(datanode, NodeManager.NODESTATE.HEALTHY); + nodeStateMap.put(datanode, HEALTHY); } // All nodes in this cluster are healthy for time being. @@ -239,8 +241,7 @@ public void testAddingNewPoolWorks() GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); try { DatanodeID id = SCMTestUtils.getDatanodeID(); - ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, NodeManager - .NODESTATE.HEALTHY); + ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); poolManager.addNode("PoolNew", id); GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains("PoolNew"), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index e999ca2c45..e1d20a4ce8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -22,6 +22,7 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto @@ -36,13 +37,18 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .STALE; + /** * Test Helper for testing container Mapping. */ public class MockNodeManager implements NodeManager { - private static final int HEALTHY_NODE_COUNT = 10; private final static NodeData[] NODES = { - new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB), + new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB), new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB), new NodeData(128L * OzoneConsts.TB, 256 * OzoneConsts.GB), new NodeData(40L * OzoneConsts.TB, OzoneConsts.TB), @@ -50,20 +56,26 @@ public class MockNodeManager implements NodeManager { new NodeData(20L * OzoneConsts.TB, 10 * OzoneConsts.GB), new NodeData(32L * OzoneConsts.TB, 16 * OzoneConsts.TB), new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB), + new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB, NodeData.STALE), + new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.STALE), + new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.DEAD) }; private final List healthyNodes; + private final List staleNodes; + private final List deadNodes; private final Map nodeMetricMap; private final SCMNodeStat aggregateStat; private boolean chillmode; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); + this.staleNodes = new LinkedList<>(); + this.deadNodes = new LinkedList<>(); this.nodeMetricMap = new HashMap<>(); aggregateStat = new SCMNodeStat(); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { DatanodeID id = SCMTestUtils.getDatanodeID(); - healthyNodes.add(id); populateNodeMetric(id, x); } } @@ -84,6 +96,19 @@ private void populateNodeMetric(DatanodeID datanodeID, int x) { (NODES[x % NODES.length].used), remaining); this.nodeMetricMap.put(datanodeID.toString(), newStat); aggregateStat.add(newStat); + + if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) { + healthyNodes.add(datanodeID); + } + + if (NODES[x % NODES.length].getCurrentState() == NodeData.STALE) { + staleNodes.add(datanodeID); + } + + if (NODES[x % NODES.length].getCurrentState() == NodeData.DEAD) { + deadNodes.add(datanodeID); + } + } /** @@ -112,10 +137,19 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException { * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List getNodes(NODESTATE nodestate) { - if (nodestate == NODESTATE.HEALTHY) { + public List getNodes(OzoneProtos.NodeState nodestate) { + if (nodestate == HEALTHY) { return healthyNodes; } + + if (nodestate == STALE) { + return staleNodes; + } + + if (nodestate == DEAD) { + return deadNodes; + } + return null; } @@ -126,9 +160,10 @@ public List getNodes(NODESTATE nodestate) { * @return int -- count */ @Override - public int getNodeCount(NODESTATE nodestate) { - if (nodestate == NODESTATE.HEALTHY) { - return HEALTHY_NODE_COUNT; + public int getNodeCount(OzoneProtos.NodeState nodestate) { + List nodes = getNodes(nodestate); + if (nodes != null) { + return nodes.size(); } return 0; } @@ -258,7 +293,7 @@ public boolean waitForHeartbeatProcessed() { * @return Healthy/Stale/Dead. */ @Override - public NODESTATE getNodeState(DatanodeID id) { + public OzoneProtos.NodeState getNodeState(DatanodeID id) { return null; } @@ -341,7 +376,7 @@ public List sendHeartbeat(DatanodeID datanodeID, for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : storageReports) { totalCapacity += report.getCapacity(); - totalRemaining +=report.getRemaining(); + totalRemaining += report.getRemaining(); totalScmUsed += report.getScmUsed(); } aggregateStat.subtract(stat); @@ -356,7 +391,7 @@ public List sendHeartbeat(DatanodeID datanodeID, @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); - for (NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) { + for (OzoneProtos.NodeState state : OzoneProtos.NodeState.values()) { nodeCountMap.put(state.toString(), getNodeCount(state)); } return nodeCountMap; @@ -399,17 +434,35 @@ public void delContainer(DatanodeID datanodeID, long size) { * won't fail. */ private static class NodeData { - private long capacity, used; + public static final long HEALTHY = 1; + public static final long STALE = 2; + public static final long DEAD = 3; + + private long capacity; + private long used; + + private long currentState; + + /** + * By default nodes are healthy. + * @param capacity + * @param used + */ + NodeData(long capacity, long used) { + this(capacity, used, HEALTHY); + } /** * Constructs a nodeDefinition. * * @param capacity capacity. * @param used used. + * @param currentState - Healthy, Stale and DEAD nodes. */ - NodeData(long capacity, long used) { + NodeData(long capacity, long used, long currentState) { this.capacity = capacity; this.used = used; + this.currentState = currentState; } public long getCapacity() { @@ -427,5 +480,14 @@ public long getUsed() { public void setUsed(long used) { this.used = used; } + + public long getCurrentState() { + return currentState; + } + + public void setCurrentState(long currentState) { + this.currentState = currentState; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index da0ed16587..134e51c8eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -46,9 +46,10 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index b483edf79a..30bc5a8065 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto @@ -49,11 +50,14 @@ import java.util.concurrent.TimeoutException; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .STALE; import static org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.Type; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; + import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; @@ -413,8 +417,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException, // Wait for 2 seconds, wait a total of 4 seconds to make sure that the // node moves into stale state. Thread.sleep(2 * 1000); - List staleNodeList = nodeManager.getNodes(NodeManager - .NODESTATE.STALE); + List staleNodeList = nodeManager.getNodes(STALE); assertEquals("Expected to find 1 stale node", 1, nodeManager.getNodeCount(STALE)); assertEquals("Expected to find 1 stale node", @@ -433,8 +436,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException, Thread.sleep(2 * 1000); // the stale node has been removed - staleNodeList = nodeManager.getNodes(NodeManager - .NODESTATE.STALE); + staleNodeList = nodeManager.getNodes(STALE); assertEquals("Expected to find 1 stale node", 0, nodeManager.getNodeCount(STALE)); assertEquals("Expected to find 1 stale node", @@ -682,7 +684,7 @@ private List createNodeSet(SCMNodeManager nodeManager, int * @return true if we found the expected number. */ private boolean findNodes(NodeManager nodeManager, int count, - NodeManager.NODESTATE state) { + OzoneProtos.NodeState state) { return count == nodeManager.getNodeCount(state); } @@ -1056,7 +1058,7 @@ public void testScmNodeReportUpdate() throws IOException, // Wait up to 4s so that the node becomes stale // Verify the usage info should be unchanged. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100, + () -> nodeManager.getNodeCount(STALE) == 1, 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeStats().size()); @@ -1074,7 +1076,7 @@ public void testScmNodeReportUpdate() throws IOException, // Wait up to 4 more seconds so the node becomes dead // Verify usage info should be updated. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100, + () -> nodeManager.getNodeCount(DEAD) == 1, 100, 4 * 1000); assertEquals(0, nodeManager.getNodeStats().size()); @@ -1099,7 +1101,7 @@ public void testScmNodeReportUpdate() throws IOException, // Wait up to 5 seconds so that the dead node becomes healthy // Verify usage info should be updated. GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1, + () -> nodeManager.getNodeCount(HEALTHY) == 1, 100, 5 * 1000); GenericTestUtils.waitFor( () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java new file mode 100644 index 0000000000..48ab8afa99 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.client.ContainerOperationClient; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.EnumSet; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState + .STALE; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_MS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_MS; +import static org.junit.Assert.assertEquals; + +/** + * Test Query Node Operation. + */ +public class TestQueryNode { + private static int numOfDatanodes = 5; + private MiniOzoneCluster cluster; + + private ContainerOperationClient scmClient; + + @Before + public void setUp() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + final int interval = 100; + + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1, SECONDS); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + + cluster = new MiniOzoneCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .build(); + cluster.waitOzoneReady(); + scmClient = new ContainerOperationClient(cluster + .createStorageContainerLocationClient(), + new XceiverClientManager(conf)); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testHealthyNodesCount() throws Exception { + OzoneProtos.NodePool pool = scmClient.queryNode( + EnumSet.of(HEALTHY), + OzoneProtos.QueryScope.CLUSTER, ""); + assertEquals("Expected live nodes", numOfDatanodes, + pool.getNodesCount()); + } + + @Test(timeout = 10 * 1000L) + public void testStaleNodesCount() throws Exception { + cluster.shutdownDataNode(0); + cluster.shutdownDataNode(1); + + GenericTestUtils.waitFor(() -> + cluster.getStorageContainerManager().getNodeCount(STALE) == 2, + 100, 4 * 1000); + + int nodeCount = scmClient.queryNode(EnumSet.of(STALE), + OzoneProtos.QueryScope.CLUSTER, "").getNodesCount(); + assertEquals("Mismatch of expected nodes count", 2, nodeCount); + + GenericTestUtils.waitFor(() -> + cluster.getStorageContainerManager().getNodeCount(DEAD) == 2, + 100, 4 * 1000); + + // Assert that we don't find any stale nodes. + nodeCount = scmClient.queryNode(EnumSet.of(STALE), + OzoneProtos.QueryScope.CLUSTER, "").getNodesCount(); + assertEquals("Mismatch of expected nodes count", 0, nodeCount); + + // Assert that we find the expected number of dead nodes. + nodeCount = scmClient.queryNode(EnumSet.of(DEAD), + OzoneProtos.QueryScope.CLUSTER, "").getNodesCount(); + assertEquals("Mismatch of expected nodes count", 2, nodeCount); + + cluster.restartDataNode(0); + cluster.restartDataNode(1); + + } +}