HDFS-12115. Ozone: SCM: Add queryNode RPC Call. Contributed by Anu Engineer.
This commit is contained in:
parent
539842ed8b
commit
0cc166c053
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
@ -28,6 +29,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -130,6 +132,23 @@ public Pipeline createContainer(String containerId,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of Nodes that meet a query criteria.
|
||||
*
|
||||
* @param nodeStatuses - A set of criteria that we want the node to have.
|
||||
* @param queryScope - Query scope - Cluster or pool.
|
||||
* @param poolName - if it is pool, a pool name is required.
|
||||
* @return A set of nodes that meet the requested criteria.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
|
||||
nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName)
|
||||
throws IOException {
|
||||
return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
|
||||
poolName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the container, this will release any resource it uses.
|
||||
* @param pipeline - Pipeline that represents the container.
|
||||
|
@ -20,8 +20,10 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -128,4 +130,15 @@ public static ReplicationFactor parseReplicationFactor(int i) {
|
||||
*/
|
||||
Pipeline createContainer(String containerId,
|
||||
ReplicationFactor replicationFactor) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a set of Nodes that meet a query criteria.
|
||||
* @param nodeStatuses - A set of criteria that we want the node to have.
|
||||
* @param queryScope - Query scope - Cluster or pool.
|
||||
* @param poolName - if it is pool, a pool name is required.
|
||||
* @return A set of nodes that meet the requested criteria.
|
||||
* @throws IOException
|
||||
*/
|
||||
OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
|
||||
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
|
||||
}
|
||||
|
@ -19,10 +19,12 @@
|
||||
package org.apache.hadoop.scm.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.scm.client.ScmClient;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
|
||||
/**
|
||||
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
|
||||
@ -88,4 +90,13 @@ List<Pipeline> listContainer(String startName, String prefixName, int count)
|
||||
* or container doesn't exist.
|
||||
*/
|
||||
void deleteContainer(String containerName) throws IOException;
|
||||
|
||||
/**
|
||||
* Queries a list of Node Statuses.
|
||||
* @param nodeStatuses
|
||||
* @return List of Datanodes.
|
||||
*/
|
||||
OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
|
||||
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
|
||||
|
||||
}
|
||||
|
@ -35,11 +35,15 @@
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
|
||||
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -186,6 +190,33 @@ public void deleteContainer(String containerName)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries a list of Node Statuses.
|
||||
*
|
||||
* @param nodeStatuses
|
||||
* @return List of Datanodes.
|
||||
*/
|
||||
@Override
|
||||
public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
|
||||
nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName)
|
||||
throws IOException {
|
||||
// TODO : We support only cluster wide query right now. So ignoring checking
|
||||
// queryScope and poolName
|
||||
Preconditions.checkNotNull(nodeStatuses);
|
||||
Preconditions.checkState(nodeStatuses.size() > 0);
|
||||
NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder()
|
||||
.addAllQuery(nodeStatuses)
|
||||
.setScope(queryScope).setPoolName(poolName).build();
|
||||
try {
|
||||
NodeQueryResponseProto response =
|
||||
rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
|
||||
return response.getDatanodes();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUnderlyingProxyObject() {
|
||||
return rpcProxy;
|
||||
|
@ -40,4 +40,34 @@ message Pipeline {
|
||||
message KeyValue {
|
||||
required string key = 1;
|
||||
optional string value = 2;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enum that represents the Node State. This is used in calls to getNodeList
|
||||
* and getNodeCount.
|
||||
*/
|
||||
enum NodeState {
|
||||
HEALTHY = 1;
|
||||
STALE = 2;
|
||||
DEAD = 3;
|
||||
DECOMMISSIONING = 4;
|
||||
DECOMMISSIONED = 5;
|
||||
RAFT_MEMBER = 6;
|
||||
FREE_NODE = 7; // Not a member in raft.
|
||||
UNKNOWN = 8;
|
||||
}
|
||||
|
||||
enum QueryScope {
|
||||
CLUSTER = 1;
|
||||
POOL = 2;
|
||||
}
|
||||
|
||||
message Node {
|
||||
required DatanodeIDProto nodeID = 1;
|
||||
repeated NodeState nodeStates = 2;
|
||||
}
|
||||
|
||||
message NodePool {
|
||||
repeated Node nodes = 1;
|
||||
}
|
||||
|
||||
|
@ -84,6 +84,33 @@ message DeleteContainerResponseProto {
|
||||
// Empty response
|
||||
}
|
||||
|
||||
/*
|
||||
NodeQueryRequest sends a request to SCM asking to send a list of nodes that
|
||||
match the NodeState that we are requesting.
|
||||
*/
|
||||
message NodeQueryRequestProto {
|
||||
|
||||
|
||||
// Repeated, So we can specify more than one status type.
|
||||
// These NodeState types are additive for now, in the sense that
|
||||
// if you specify HEALTHY and FREE_NODE members --
|
||||
// Then you get all healthy node which are not raft members.
|
||||
//
|
||||
// if you specify all healthy and dead nodes, you will get nothing
|
||||
// back. Server is not going to dictate what combinations make sense,
|
||||
// it is entirely up to the caller.
|
||||
// TODO: Support operators like OR and NOT. Currently it is always an
|
||||
// implied AND.
|
||||
|
||||
repeated hadoop.hdfs.ozone.NodeState query = 1;
|
||||
required hadoop.hdfs.ozone.QueryScope scope = 2;
|
||||
optional string poolName = 3; // if scope is pool, then pool name is needed.
|
||||
}
|
||||
|
||||
message NodeQueryResponseProto {
|
||||
required hadoop.hdfs.ozone.NodePool datanodes = 1;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
||||
@ -107,4 +134,9 @@ service StorageContainerLocationProtocolService {
|
||||
* Deletes a container in SCM.
|
||||
*/
|
||||
rpc deleteContainer(DeleteContainerRequestProto) returns (DeleteContainerResponseProto);
|
||||
|
||||
/**
|
||||
* Returns a set of Nodes that meet a criteria.
|
||||
*/
|
||||
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
|
||||
}
|
||||
|
@ -18,12 +18,16 @@
|
||||
package org.apache.hadoop.ozone.protocolPB;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos;
|
||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
@ -138,4 +142,23 @@ public DeleteContainerResponseProto deleteContainer(
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageContainerLocationProtocolProtos.NodeQueryResponseProto
|
||||
queryNode(RpcController controller,
|
||||
StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
EnumSet<OzoneProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request
|
||||
.getQueryList());
|
||||
OzoneProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet,
|
||||
request.getScope(), request.getPoolName());
|
||||
return StorageContainerLocationProtocolProtos
|
||||
.NodeQueryResponseProto.newBuilder()
|
||||
.setDatanodes(datanodes)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,20 +31,12 @@
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.scm.block.BlockManager;
|
||||
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
|
||||
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.scm.client.ScmClient;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -61,6 +53,8 @@
|
||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -71,20 +65,28 @@
|
||||
.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.StorageContainerLocationProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.scm.block.BlockManager;
|
||||
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
|
||||
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.ozone.scm.container.Mapping;
|
||||
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
|
||||
import org.apache.hadoop.scm.client.ScmClient;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
@ -93,32 +95,30 @@
|
||||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Collection;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
||||
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_HANDLER_COUNT_DEFAULT;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
/**
|
||||
@ -134,7 +134,7 @@
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
||||
public class StorageContainerManager
|
||||
implements StorageContainerDatanodeProtocol,
|
||||
StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean{
|
||||
StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(StorageContainerManager.class);
|
||||
@ -420,6 +420,96 @@ public void deleteContainer(String containerName) throws IOException {
|
||||
scmContainerManager.deleteContainer(containerName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries a list of Node Statuses.
|
||||
*
|
||||
* @param nodeStatuses
|
||||
* @param queryScope
|
||||
* @param poolName @return List of Datanodes.
|
||||
*/
|
||||
@Override
|
||||
public OzoneProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses,
|
||||
OzoneProtos.QueryScope queryScope, String poolName) throws IOException {
|
||||
|
||||
if (queryScope == OzoneProtos.QueryScope.POOL) {
|
||||
throw new IllegalArgumentException("Not Supported yet");
|
||||
}
|
||||
|
||||
List<DatanodeID> datanodes = queryNode(nodeStatuses);
|
||||
OzoneProtos.NodePool.Builder poolBuilder =
|
||||
OzoneProtos.NodePool.newBuilder();
|
||||
|
||||
for (DatanodeID datanode : datanodes) {
|
||||
OzoneProtos.Node node = OzoneProtos.Node.newBuilder()
|
||||
.setNodeID(datanode.getProtoBufMessage())
|
||||
.addAllNodeStates(nodeStatuses)
|
||||
.build();
|
||||
poolBuilder.addNodes(node);
|
||||
}
|
||||
|
||||
return poolBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries a list of Node that match a set of statuses.
|
||||
* <p>
|
||||
* For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
|
||||
* then this call will return all healthy nodes which members in
|
||||
* Raft pipeline.
|
||||
* <p>
|
||||
* Right now we don't support operations, so we assume it is an AND operation
|
||||
* between the operators.
|
||||
*
|
||||
* @param nodeStatuses - A set of NodeStates.
|
||||
* @return List of Datanodes.
|
||||
*/
|
||||
|
||||
public List<DatanodeID> queryNode(EnumSet<NodeState> nodeStatuses) {
|
||||
Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
|
||||
Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
|
||||
"in the query set");
|
||||
List<DatanodeID> resultList = new LinkedList<>();
|
||||
Set<DatanodeID> currentSet = new TreeSet<>();
|
||||
|
||||
for (NodeState nodeState : nodeStatuses) {
|
||||
Set<DatanodeID> nextSet = queryNodeState(nodeState);
|
||||
if ((nextSet == null) || (nextSet.size() == 0)) {
|
||||
// Right now we only support AND operation. So intersect with
|
||||
// any empty set is null.
|
||||
return resultList;
|
||||
}
|
||||
// First time we have to add all the elements, next time we have to
|
||||
// do an intersection operation on the set.
|
||||
if (currentSet.size() == 0) {
|
||||
currentSet.addAll(nextSet);
|
||||
} else {
|
||||
currentSet.retainAll(nextSet);
|
||||
}
|
||||
}
|
||||
|
||||
resultList.addAll(currentSet);
|
||||
return resultList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the System for Nodes.
|
||||
*
|
||||
* @param nodeState - NodeState that we are interested in matching.
|
||||
* @return Set of Datanodes that match the NodeState.
|
||||
*/
|
||||
private Set<DatanodeID> queryNodeState(NodeState nodeState) {
|
||||
if (nodeState == NodeState.RAFT_MEMBER ||
|
||||
nodeState == NodeState.FREE_NODE) {
|
||||
throw new IllegalStateException("Not implemented yet");
|
||||
}
|
||||
Set<DatanodeID> returnSet = new TreeSet<>();
|
||||
List<DatanodeID> tmp = getScmNodeManager().getNodes(nodeState);
|
||||
if ((tmp != null) && (tmp.size() > 0)) {
|
||||
returnSet.addAll(tmp);
|
||||
}
|
||||
return returnSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks SCM where a container should be allocated. SCM responds with the set
|
||||
* of datanodes that should be used creating this container.
|
||||
@ -610,14 +700,14 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
|
||||
* @param nodestate Healthy, Dead etc.
|
||||
* @return int -- count
|
||||
*/
|
||||
public int getNodeCount(SCMNodeManager.NODESTATE nodestate) {
|
||||
public int getNodeCount(NodeState nodestate) {
|
||||
return scmNodeManager.getNodeCount(nodestate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getNodeCount() {
|
||||
Map<String, Integer> countMap = new HashMap<String, Integer>();
|
||||
for (SCMNodeManager.NODESTATE state : SCMNodeManager.NODESTATE.values()) {
|
||||
for (NodeState state : NodeState.values()) {
|
||||
countMap.put(state.toString(), scmNodeManager.getNodeCount(state));
|
||||
}
|
||||
return countMap;
|
||||
|
@ -20,6 +20,7 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
@ -103,7 +104,7 @@ public Configuration getConf() {
|
||||
public List<DatanodeID> chooseDatanodes(int nodesRequired, final long
|
||||
sizeRequired) throws SCMException {
|
||||
List<DatanodeID> healthyNodes =
|
||||
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
|
||||
nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY);
|
||||
String msg;
|
||||
if (healthyNodes.size() == 0) {
|
||||
msg = "No healthy node found to allocate container.";
|
||||
|
@ -20,10 +20,9 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.ozone.scm.node.CommandQueue;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.scm.node.NodePoolManager;
|
||||
@ -40,11 +39,10 @@
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.google.common.util.concurrent.Uninterruptibles
|
||||
.sleepUninterruptibly;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.UNKNOWN;
|
||||
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN;
|
||||
|
||||
/**
|
||||
* These are pools that are actively checking for replication status of the
|
||||
@ -183,7 +181,7 @@ public void startReconciliation() {
|
||||
*/
|
||||
SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
|
||||
for (DatanodeID id : datanodeIDList) {
|
||||
NodeManager.NODESTATE currentState = getNodestate(id);
|
||||
NodeState currentState = getNodestate(id);
|
||||
if (currentState == HEALTHY || currentState == STALE) {
|
||||
nodeCount.incrementAndGet();
|
||||
// Queue commands to all datanodes in this pool to send us container
|
||||
@ -202,8 +200,8 @@ public void startReconciliation() {
|
||||
* @param id - datanode ID.
|
||||
* @return NodeState.
|
||||
*/
|
||||
private NodeManager.NODESTATE getNodestate(DatanodeID id) {
|
||||
NodeManager.NODESTATE currentState = UNKNOWN;
|
||||
private NodeState getNodestate(DatanodeID id) {
|
||||
NodeState currentState = UNKNOWN;
|
||||
int maxTry = 100;
|
||||
// We need to loop to make sure that we will retry if we get
|
||||
// node state unknown. This can lead to infinite loop if we send
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
|
||||
|
||||
@ -62,18 +63,17 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||
|
||||
/**
|
||||
* Gets all Live Datanodes that is currently communicating with SCM.
|
||||
* @param nodestate - State of the node
|
||||
* @param nodeState - State of the node
|
||||
* @return List of Datanodes that are Heartbeating SCM.
|
||||
*/
|
||||
|
||||
List<DatanodeID> getNodes(NODESTATE nodestate);
|
||||
List<DatanodeID> getNodes(NodeState nodeState);
|
||||
|
||||
/**
|
||||
* Returns the Number of Datanodes that are communicating with SCM.
|
||||
* @param nodestate - State of the node
|
||||
* @param nodeState - State of the node
|
||||
* @return int -- count
|
||||
*/
|
||||
int getNodeCount(NODESTATE nodestate);
|
||||
int getNodeCount(NodeState nodeState);
|
||||
|
||||
/**
|
||||
* Get all datanodes known to SCM.
|
||||
@ -102,17 +102,6 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||
*/
|
||||
void clearChillModeFlag();
|
||||
|
||||
/**
|
||||
* Enum that represents the Node State. This is used in calls to getNodeList
|
||||
* and getNodeCount. TODO: Add decommission when we support it.
|
||||
*/
|
||||
enum NODESTATE {
|
||||
HEALTHY,
|
||||
STALE,
|
||||
DEAD,
|
||||
UNKNOWN
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the aggregated node stats.
|
||||
* @return the aggregated node stats.
|
||||
@ -144,5 +133,5 @@ enum NODESTATE {
|
||||
* @param id - DatanodeID
|
||||
* @return Healthy/Stale/Dead.
|
||||
*/
|
||||
NODESTATE getNodeState(DatanodeID id);
|
||||
NodeState getNodeState(DatanodeID id);
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
|
||||
.ErrorCode;
|
||||
@ -64,6 +65,10 @@
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
/**
|
||||
@ -212,7 +217,7 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException {
|
||||
* @return List of Datanodes that are known to SCM in the requested state.
|
||||
*/
|
||||
@Override
|
||||
public List<DatanodeID> getNodes(NODESTATE nodestate)
|
||||
public List<DatanodeID> getNodes(NodeState nodestate)
|
||||
throws IllegalArgumentException {
|
||||
Map<String, Long> set;
|
||||
switch (nodestate) {
|
||||
@ -368,7 +373,7 @@ public void forceEnterChillMode() {
|
||||
* @return int -- count
|
||||
*/
|
||||
@Override
|
||||
public int getNodeCount(NODESTATE nodestate) {
|
||||
public int getNodeCount(NodeState nodestate) {
|
||||
switch (nodestate) {
|
||||
case HEALTHY:
|
||||
return healthyNodeCount.get();
|
||||
@ -383,7 +388,7 @@ public int getNodeCount(NODESTATE nodestate) {
|
||||
// that this information might not be consistent always.
|
||||
return 0;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown node state requested.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -405,7 +410,7 @@ public boolean waitForHeartbeatProcessed() {
|
||||
* @return Healthy/Stale/Dead/Unknown.
|
||||
*/
|
||||
@Override
|
||||
public NODESTATE getNodeState(DatanodeID id) {
|
||||
public NodeState getNodeState(DatanodeID id) {
|
||||
// There is a subtle race condition here, hence we also support
|
||||
// the NODEState.UNKNOWN. It is possible that just before we check the
|
||||
// healthyNodes, we have removed the node from the healthy list but stil
|
||||
@ -415,18 +420,18 @@ public NODESTATE getNodeState(DatanodeID id) {
|
||||
// just deal with the possibilty of getting a state called unknown.
|
||||
|
||||
if(healthyNodes.containsKey(id.getDatanodeUuid())) {
|
||||
return NODESTATE.HEALTHY;
|
||||
return HEALTHY;
|
||||
}
|
||||
|
||||
if(staleNodes.containsKey(id.getDatanodeUuid())) {
|
||||
return NODESTATE.STALE;
|
||||
return STALE;
|
||||
}
|
||||
|
||||
if(deadNodes.containsKey(id.getDatanodeUuid())) {
|
||||
return NODESTATE.DEAD;
|
||||
return DEAD;
|
||||
}
|
||||
|
||||
return NODESTATE.UNKNOWN;
|
||||
return UNKNOWN;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -826,7 +831,7 @@ public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
|
||||
@Override
|
||||
public Map<String, Integer> getNodeCount() {
|
||||
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
|
||||
for(NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) {
|
||||
for(NodeState state : NodeState.values()) {
|
||||
nodeCountMap.put(state.toString(), getNodeCount(state));
|
||||
}
|
||||
return nodeCountMap;
|
||||
|
@ -19,11 +19,13 @@
|
||||
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.scm.client.ScmClient;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -121,4 +123,20 @@ public Pipeline createContainer(String containerId,
|
||||
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
|
||||
return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
|
||||
.getPipeline(); }
|
||||
|
||||
/**
|
||||
* Returns a set of Nodes that meet a query criteria.
|
||||
*
|
||||
* @param nodeStatuses - A set of criteria that we want the node to have.
|
||||
* @param queryScope - Query scope - Cluster or pool.
|
||||
* @param poolName - if it is pool, a pool name is required.
|
||||
* @return A set of nodes that meet the requested criteria.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
|
||||
nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,6 @@
|
||||
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -56,6 +55,8 @@
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
@ -236,7 +237,7 @@ public OzoneRestClient createOzoneRestClient() throws OzoneException {
|
||||
*/
|
||||
public void waitOzoneReady() throws TimeoutException, InterruptedException {
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY);
|
||||
final int healthy = scm.getNodeCount(HEALTHY);
|
||||
final boolean isReady = healthy >= numDataNodes;
|
||||
LOG.info("{}. Got {} of {} DN Heartbeats.",
|
||||
isReady? "Cluster is ready" : "Waiting for cluster to be ready",
|
||||
|
@ -27,6 +27,8 @@
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
|
||||
|
||||
/**
|
||||
* This class manages the state of datanode
|
||||
* in conjunction with the node pool and node managers.
|
||||
@ -74,7 +76,7 @@ public List<ContainerReportsProto> getContainerReport(String containerName,
|
||||
DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
|
||||
nodesInPool.remove(id);
|
||||
// We return container reports only for nodes that are healthy.
|
||||
if (nodeManager.getNodeState(id) == NodeManager.NODESTATE.HEALTHY) {
|
||||
if (nodeManager.getNodeState(id) == HEALTHY) {
|
||||
ContainerInfo info = ContainerInfo.newBuilder()
|
||||
.setContainerName(containerName)
|
||||
.setFinalhash(DigestUtils.sha256Hex(containerName))
|
||||
|
@ -32,18 +32,19 @@
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
|
||||
/**
|
||||
* A Node Manager to test replication.
|
||||
*/
|
||||
public class ReplicationNodeManagerMock implements NodeManager {
|
||||
private final Map<DatanodeID, NODESTATE> nodeStateMap;
|
||||
private final Map<DatanodeID, NodeState> nodeStateMap;
|
||||
|
||||
/**
|
||||
* A list of Datanodes and current states.
|
||||
* @param nodeState A node state map.
|
||||
*/
|
||||
public ReplicationNodeManagerMock(Map<DatanodeID, NODESTATE> nodeState) {
|
||||
public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState) {
|
||||
Preconditions.checkNotNull(nodeState);
|
||||
nodeStateMap = nodeState;
|
||||
}
|
||||
@ -118,7 +119,7 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException {
|
||||
* @return List of Datanodes that are Heartbeating SCM.
|
||||
*/
|
||||
@Override
|
||||
public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||
public List<DatanodeID> getNodes(NodeState nodestate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -129,7 +130,7 @@ public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||
* @return int -- count
|
||||
*/
|
||||
@Override
|
||||
public int getNodeCount(NODESTATE nodestate) {
|
||||
public int getNodeCount(NodeState nodestate) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -220,7 +221,7 @@ public boolean waitForHeartbeatProcessed() {
|
||||
* @return Healthy/Stale/Dead.
|
||||
*/
|
||||
@Override
|
||||
public NODESTATE getNodeState(DatanodeID id) {
|
||||
public NodeState getNodeState(DatanodeID id) {
|
||||
return nodeStateMap.get(id);
|
||||
}
|
||||
|
||||
@ -308,7 +309,7 @@ public void clearMap() {
|
||||
* @param id - DatanodeID
|
||||
* @param state State you want to put that node to.
|
||||
*/
|
||||
public void addNode(DatanodeID id, NODESTATE state) {
|
||||
public void addNode(DatanodeID id, NodeState state) {
|
||||
nodeStateMap.put(id, state);
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,8 @@
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
@ -40,7 +42,7 @@ public class TestContainerPlacement {
|
||||
|
||||
private DescriptiveStatistics computeStatistics(NodeManager nodeManager) {
|
||||
DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
|
||||
for (DatanodeID id : nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY)) {
|
||||
for (DatanodeID id : nodeManager.getNodes(HEALTHY)) {
|
||||
float weightedValue =
|
||||
nodeManager.getNodeStat(id).get().getScmUsed().get() / (float)
|
||||
nodeManager.getNodeStat(id).get().getCapacity().get();
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.ozone.container.TestUtils
|
||||
.ReplicationNodePoolManagerMock;
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.ozone.scm.container.replication
|
||||
@ -49,6 +50,7 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
|
||||
import static org.apache.ratis.shaded.com.google.common.util.concurrent
|
||||
@ -80,13 +82,13 @@ public void tearDown() throws Exception {
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG);
|
||||
Map<DatanodeID, NodeManager.NODESTATE> nodeStateMap = new HashMap<>();
|
||||
Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>();
|
||||
// We are setting up 3 pools with 24 nodes each in this cluster.
|
||||
// First we create 72 Datanodes.
|
||||
for (int x = 0; x < MAX_DATANODES; x++) {
|
||||
DatanodeID datanode = SCMTestUtils.getDatanodeID();
|
||||
datanodes.add(datanode);
|
||||
nodeStateMap.put(datanode, NodeManager.NODESTATE.HEALTHY);
|
||||
nodeStateMap.put(datanode, HEALTHY);
|
||||
}
|
||||
|
||||
// All nodes in this cluster are healthy for time being.
|
||||
@ -239,8 +241,7 @@ public void testAddingNewPoolWorks()
|
||||
GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG);
|
||||
try {
|
||||
DatanodeID id = SCMTestUtils.getDatanodeID();
|
||||
((ReplicationNodeManagerMock) (nodeManager)).addNode(id, NodeManager
|
||||
.NODESTATE.HEALTHY);
|
||||
((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY);
|
||||
poolManager.addNode("PoolNew", id);
|
||||
GenericTestUtils.waitFor(() ->
|
||||
logCapturer.getOutput().contains("PoolNew"),
|
||||
|
@ -5,9 +5,9 @@
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -36,13 +37,18 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.STALE;
|
||||
|
||||
/**
|
||||
* Test Helper for testing container Mapping.
|
||||
*/
|
||||
public class MockNodeManager implements NodeManager {
|
||||
private static final int HEALTHY_NODE_COUNT = 10;
|
||||
private final static NodeData[] NODES = {
|
||||
new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB),
|
||||
new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB),
|
||||
new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB),
|
||||
new NodeData(128L * OzoneConsts.TB, 256 * OzoneConsts.GB),
|
||||
new NodeData(40L * OzoneConsts.TB, OzoneConsts.TB),
|
||||
@ -50,20 +56,26 @@ public class MockNodeManager implements NodeManager {
|
||||
new NodeData(20L * OzoneConsts.TB, 10 * OzoneConsts.GB),
|
||||
new NodeData(32L * OzoneConsts.TB, 16 * OzoneConsts.TB),
|
||||
new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB),
|
||||
new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB, NodeData.STALE),
|
||||
new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.STALE),
|
||||
new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.DEAD)
|
||||
};
|
||||
private final List<DatanodeID> healthyNodes;
|
||||
private final List<DatanodeID> staleNodes;
|
||||
private final List<DatanodeID> deadNodes;
|
||||
private final Map<String, SCMNodeStat> nodeMetricMap;
|
||||
private final SCMNodeStat aggregateStat;
|
||||
private boolean chillmode;
|
||||
|
||||
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
|
||||
this.healthyNodes = new LinkedList<>();
|
||||
this.staleNodes = new LinkedList<>();
|
||||
this.deadNodes = new LinkedList<>();
|
||||
this.nodeMetricMap = new HashMap<>();
|
||||
aggregateStat = new SCMNodeStat();
|
||||
if (initializeFakeNodes) {
|
||||
for (int x = 0; x < nodeCount; x++) {
|
||||
DatanodeID id = SCMTestUtils.getDatanodeID();
|
||||
healthyNodes.add(id);
|
||||
populateNodeMetric(id, x);
|
||||
}
|
||||
}
|
||||
@ -84,6 +96,19 @@ private void populateNodeMetric(DatanodeID datanodeID, int x) {
|
||||
(NODES[x % NODES.length].used), remaining);
|
||||
this.nodeMetricMap.put(datanodeID.toString(), newStat);
|
||||
aggregateStat.add(newStat);
|
||||
|
||||
if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) {
|
||||
healthyNodes.add(datanodeID);
|
||||
}
|
||||
|
||||
if (NODES[x % NODES.length].getCurrentState() == NodeData.STALE) {
|
||||
staleNodes.add(datanodeID);
|
||||
}
|
||||
|
||||
if (NODES[x % NODES.length].getCurrentState() == NodeData.DEAD) {
|
||||
deadNodes.add(datanodeID);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -112,10 +137,19 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException {
|
||||
* @return List of Datanodes that are Heartbeating SCM.
|
||||
*/
|
||||
@Override
|
||||
public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||
if (nodestate == NODESTATE.HEALTHY) {
|
||||
public List<DatanodeID> getNodes(OzoneProtos.NodeState nodestate) {
|
||||
if (nodestate == HEALTHY) {
|
||||
return healthyNodes;
|
||||
}
|
||||
|
||||
if (nodestate == STALE) {
|
||||
return staleNodes;
|
||||
}
|
||||
|
||||
if (nodestate == DEAD) {
|
||||
return deadNodes;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -126,9 +160,10 @@ public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||
* @return int -- count
|
||||
*/
|
||||
@Override
|
||||
public int getNodeCount(NODESTATE nodestate) {
|
||||
if (nodestate == NODESTATE.HEALTHY) {
|
||||
return HEALTHY_NODE_COUNT;
|
||||
public int getNodeCount(OzoneProtos.NodeState nodestate) {
|
||||
List<DatanodeID> nodes = getNodes(nodestate);
|
||||
if (nodes != null) {
|
||||
return nodes.size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -258,7 +293,7 @@ public boolean waitForHeartbeatProcessed() {
|
||||
* @return Healthy/Stale/Dead.
|
||||
*/
|
||||
@Override
|
||||
public NODESTATE getNodeState(DatanodeID id) {
|
||||
public OzoneProtos.NodeState getNodeState(DatanodeID id) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -341,7 +376,7 @@ public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
|
||||
for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report :
|
||||
storageReports) {
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining +=report.getRemaining();
|
||||
totalRemaining += report.getRemaining();
|
||||
totalScmUsed += report.getScmUsed();
|
||||
}
|
||||
aggregateStat.subtract(stat);
|
||||
@ -356,7 +391,7 @@ public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
|
||||
@Override
|
||||
public Map<String, Integer> getNodeCount() {
|
||||
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
|
||||
for (NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) {
|
||||
for (OzoneProtos.NodeState state : OzoneProtos.NodeState.values()) {
|
||||
nodeCountMap.put(state.toString(), getNodeCount(state));
|
||||
}
|
||||
return nodeCountMap;
|
||||
@ -399,17 +434,35 @@ public void delContainer(DatanodeID datanodeID, long size) {
|
||||
* won't fail.
|
||||
*/
|
||||
private static class NodeData {
|
||||
private long capacity, used;
|
||||
public static final long HEALTHY = 1;
|
||||
public static final long STALE = 2;
|
||||
public static final long DEAD = 3;
|
||||
|
||||
private long capacity;
|
||||
private long used;
|
||||
|
||||
private long currentState;
|
||||
|
||||
/**
|
||||
* By default nodes are healthy.
|
||||
* @param capacity
|
||||
* @param used
|
||||
*/
|
||||
NodeData(long capacity, long used) {
|
||||
this(capacity, used, HEALTHY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a nodeDefinition.
|
||||
*
|
||||
* @param capacity capacity.
|
||||
* @param used used.
|
||||
* @param currentState - Healthy, Stale and DEAD nodes.
|
||||
*/
|
||||
NodeData(long capacity, long used) {
|
||||
NodeData(long capacity, long used, long currentState) {
|
||||
this.capacity = capacity;
|
||||
this.used = used;
|
||||
this.currentState = currentState;
|
||||
}
|
||||
|
||||
public long getCapacity() {
|
||||
@ -427,5 +480,14 @@ public long getUsed() {
|
||||
public void setUsed(long used) {
|
||||
this.used = used;
|
||||
}
|
||||
|
||||
public long getCurrentState() {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
public void setCurrentState(long currentState) {
|
||||
this.currentState = currentState;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -46,9 +46,10 @@
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
|
||||
import static org.hamcrest.core.StringStartsWith.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
@ -49,11 +50,14 @@
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.STALE;
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
|
||||
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
|
||||
@ -413,8 +417,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
|
||||
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
|
||||
// node moves into stale state.
|
||||
Thread.sleep(2 * 1000);
|
||||
List<DatanodeID> staleNodeList = nodeManager.getNodes(NodeManager
|
||||
.NODESTATE.STALE);
|
||||
List<DatanodeID> staleNodeList = nodeManager.getNodes(STALE);
|
||||
assertEquals("Expected to find 1 stale node",
|
||||
1, nodeManager.getNodeCount(STALE));
|
||||
assertEquals("Expected to find 1 stale node",
|
||||
@ -433,8 +436,7 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
|
||||
Thread.sleep(2 * 1000);
|
||||
|
||||
// the stale node has been removed
|
||||
staleNodeList = nodeManager.getNodes(NodeManager
|
||||
.NODESTATE.STALE);
|
||||
staleNodeList = nodeManager.getNodes(STALE);
|
||||
assertEquals("Expected to find 1 stale node",
|
||||
0, nodeManager.getNodeCount(STALE));
|
||||
assertEquals("Expected to find 1 stale node",
|
||||
@ -682,7 +684,7 @@ private List<DatanodeID> createNodeSet(SCMNodeManager nodeManager, int
|
||||
* @return true if we found the expected number.
|
||||
*/
|
||||
private boolean findNodes(NodeManager nodeManager, int count,
|
||||
NodeManager.NODESTATE state) {
|
||||
OzoneProtos.NodeState state) {
|
||||
return count == nodeManager.getNodeCount(state);
|
||||
}
|
||||
|
||||
@ -1056,7 +1058,7 @@ public void testScmNodeReportUpdate() throws IOException,
|
||||
// Wait up to 4s so that the node becomes stale
|
||||
// Verify the usage info should be unchanged.
|
||||
GenericTestUtils.waitFor(
|
||||
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
|
||||
() -> nodeManager.getNodeCount(STALE) == 1, 100,
|
||||
4 * 1000);
|
||||
assertEquals(nodeCount, nodeManager.getNodeStats().size());
|
||||
|
||||
@ -1074,7 +1076,7 @@ public void testScmNodeReportUpdate() throws IOException,
|
||||
// Wait up to 4 more seconds so the node becomes dead
|
||||
// Verify usage info should be updated.
|
||||
GenericTestUtils.waitFor(
|
||||
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.DEAD) == 1, 100,
|
||||
() -> nodeManager.getNodeCount(DEAD) == 1, 100,
|
||||
4 * 1000);
|
||||
|
||||
assertEquals(0, nodeManager.getNodeStats().size());
|
||||
@ -1099,7 +1101,7 @@ public void testScmNodeReportUpdate() throws IOException,
|
||||
// Wait up to 5 seconds so that the dead node becomes healthy
|
||||
// Verify usage info should be updated.
|
||||
GenericTestUtils.waitFor(
|
||||
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1,
|
||||
() -> nodeManager.getNodeCount(HEALTHY) == 1,
|
||||
100, 5 * 1000);
|
||||
GenericTestUtils.waitFor(
|
||||
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
|
||||
|
@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.scm.node;
|
||||
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.client.ContainerOperationClient;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.DEAD;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.HEALTHY;
|
||||
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
|
||||
.STALE;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_DEADNODE_INTERVAL_MS;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys
|
||||
.OZONE_SCM_STALENODE_INTERVAL_MS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Test Query Node Operation.
|
||||
*/
|
||||
public class TestQueryNode {
|
||||
private static int numOfDatanodes = 5;
|
||||
private MiniOzoneCluster cluster;
|
||||
|
||||
private ContainerOperationClient scmClient;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
final int interval = 100;
|
||||
|
||||
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
|
||||
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1, SECONDS);
|
||||
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
|
||||
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
|
||||
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.numDataNodes(numOfDatanodes)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||
.build();
|
||||
cluster.waitOzoneReady();
|
||||
scmClient = new ContainerOperationClient(cluster
|
||||
.createStorageContainerLocationClient(),
|
||||
new XceiverClientManager(conf));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHealthyNodesCount() throws Exception {
|
||||
OzoneProtos.NodePool pool = scmClient.queryNode(
|
||||
EnumSet.of(HEALTHY),
|
||||
OzoneProtos.QueryScope.CLUSTER, "");
|
||||
assertEquals("Expected live nodes", numOfDatanodes,
|
||||
pool.getNodesCount());
|
||||
}
|
||||
|
||||
@Test(timeout = 10 * 1000L)
|
||||
public void testStaleNodesCount() throws Exception {
|
||||
cluster.shutdownDataNode(0);
|
||||
cluster.shutdownDataNode(1);
|
||||
|
||||
GenericTestUtils.waitFor(() ->
|
||||
cluster.getStorageContainerManager().getNodeCount(STALE) == 2,
|
||||
100, 4 * 1000);
|
||||
|
||||
int nodeCount = scmClient.queryNode(EnumSet.of(STALE),
|
||||
OzoneProtos.QueryScope.CLUSTER, "").getNodesCount();
|
||||
assertEquals("Mismatch of expected nodes count", 2, nodeCount);
|
||||
|
||||
GenericTestUtils.waitFor(() ->
|
||||
cluster.getStorageContainerManager().getNodeCount(DEAD) == 2,
|
||||
100, 4 * 1000);
|
||||
|
||||
// Assert that we don't find any stale nodes.
|
||||
nodeCount = scmClient.queryNode(EnumSet.of(STALE),
|
||||
OzoneProtos.QueryScope.CLUSTER, "").getNodesCount();
|
||||
assertEquals("Mismatch of expected nodes count", 0, nodeCount);
|
||||
|
||||
// Assert that we find the expected number of dead nodes.
|
||||
nodeCount = scmClient.queryNode(EnumSet.of(DEAD),
|
||||
OzoneProtos.QueryScope.CLUSTER, "").getNodesCount();
|
||||
assertEquals("Mismatch of expected nodes count", 2, nodeCount);
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
cluster.restartDataNode(1);
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user