HDFS-11001. Ozone:SCM: Add support for registerNode in SCM. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-10-26 17:33:37 -07:00 committed by Owen O'Malley
parent 964daed853
commit e49e305f25
12 changed files with 1004 additions and 224 deletions

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import java.util.List;
/**
* The protocol spoken between datanodes and SCM.
*
* Please note that the full protocol spoken between a datanode and SCM is
* separated into 2 interfaces. One interface that deals with node state and
* another interface that deals with containers.
*
* This interface has functions that deals with the state of datanode.
*/
@InterfaceAudience.Private
public interface StorageContainerNodeProtocol {
/**
* Gets the version info from SCM.
* @param versionRequest - version Request.
* @return - returns SCM version info and other required information needed
* by datanode.
*/
VersionResponse getVersion(SCMVersionRequestProto versionRequest);
/**
* Register the node if the node finds that it is not registered with any SCM.
* @param datanodeID - Send datanodeID with Node info, but datanode UUID is
* empty. Server returns a datanodeID for the given node.
* @return SCMHeartbeatResponseProto
*/
SCMCommand register(DatanodeID datanodeID);
/**
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeID - Datanode ID.
* @return SCMheartbeat response list
*/
List<SCMCommand> sendHeartbeat(DatanodeID datanodeID);
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Version response class.
*/
public class VersionResponse {
private final int version;
private final Map<String, String> values;
/**
* Creates a version response class.
* @param version
* @param values
*/
public VersionResponse(int version, Map<String, String> values) {
this.version = version;
this.values = values;
}
/**
* Creates a version Response class.
* @param version
*/
public VersionResponse(int version) {
this.version = version;
this.values = new HashMap<>();
}
/**
* Returns a new Builder.
* @return - Builder.
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Returns this class from protobuf message.
* @param response - SCMVersionResponseProto
* @return VersionResponse
*/
public static VersionResponse getFromProtobuf(SCMVersionResponseProto
response) {
return new VersionResponse(response.getSoftwareVersion(),
response.getKeysList().stream()
.collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue)));
}
/**
* Adds a value to version Response.
* @param key - String
* @param value - String
*/
public void put(String key, String value) {
if (this.values.containsKey(key)) {
throw new IllegalArgumentException("Duplicate key in version response");
}
values.put(key, value);
}
/**
* Return a protobuf message.
* @return SCMVersionResponseProto.
*/
public SCMVersionResponseProto getProtobufMessage() {
List<KeyValue> list = new LinkedList<>();
for (Map.Entry<String, String> entry : values.entrySet()) {
list.add(KeyValue.newBuilder().setKey(entry.getKey()).
setValue(entry.getValue()).build());
}
return
SCMVersionResponseProto.newBuilder()
.setSoftwareVersion(this.version)
.addAllKeys(list).build();
}
/**
* Builder class.
*/
public static class Builder {
private int version;
private Map<String, String> values;
Builder() {
values = new HashMap<>();
}
/**
* Sets the version.
* @param ver - version
* @return Builder
*/
public Builder setVersion(int ver) {
this.version = ver;
return this;
}
/**
* Adds a value to version Response.
* @param key - String
* @param value - String
*/
public Builder addValue(String key, String value) {
if (this.values.containsKey(key)) {
throw new IllegalArgumentException("Duplicate key in version response");
}
values.put(key, value);
return this;
}
/**
* Builds the version response.
* @return VersionResponse.
*/
public VersionResponse build() {
return new VersionResponse(this.version, this.values);
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
/**
* For each command that SCM can return we have a class in this commands
* directory. This is the Null command, that tells datanode that no action is
* needed from it.
*/
public class NullCommand extends SCMCommand<NullCmdResponseProto> {
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public Type getType() {
return Type.nullCmd;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public NullCmdResponseProto getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build();
}
/**
* Returns a NullCommand class from NullCommandResponse Proto.
* @param unused - unused
* @return NullCommand
*/
public static NullCommand getFromProtobuf(final NullCmdResponseProto
unused) {
return new NullCommand();
}
/**
* returns a new builder.
* @return Builder
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* A Builder class this is the standard pattern we are using for all commands.
*/
public static class Builder {
/**
* Return a null command.
* @return - NullCommand.
*/
public NullCommand build() {
return new NullCommand();
}
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
/**
* Response to Datanode Register call.
*/
public class RegisteredCommand extends SCMCommand<RegisteredCmdResponseProto> {
private String datanodeUUID;
private String clusterID;
private ErrorCode error;
public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
final String clusterID) {
this.datanodeUUID = datanodeUUID;
this.clusterID = clusterID;
this.error = error;
}
/**
* Returns a new builder.
*
* @return - Builder
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
Type getType() {
return Type.registeredCmd;
}
/**
* Returns datanode UUID.
* @return - Datanode ID.
*/
public String getDatanodeUUID() {
return datanodeUUID;
}
/**
* Returns cluster ID.
* @return -- ClusterID
*/
public String getClusterID() {
return clusterID;
}
/**
* Returns ErrorCode.
* @return - ErrorCode
*/
public ErrorCode getError() {
return error;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
RegisteredCmdResponseProto getProtoBufMessage() {
return RegisteredCmdResponseProto.newBuilder()
.setClusterID(this.clusterID)
.setDatanodeUUID(this.datanodeUUID)
.setErrorCode(this.error)
.build();
}
/**
* A builder class to verify all values are sane.
*/
public static class Builder {
private String datanodeUUID;
private String clusterID;
private ErrorCode error;
/**
* sets UUID.
*
* @param dnUUID - datanode UUID
* @return Builder
*/
public Builder setDatanodeUUID(String dnUUID) {
this.datanodeUUID = dnUUID;
return this;
}
/**
* Create this object from a Protobuf message.
*
* @param response - RegisteredCmdResponseProto
* @return RegisteredCommand
*/
public RegisteredCommand getFromProtobuf(RegisteredCmdResponseProto
response) {
Preconditions.checkNotNull(response);
return new RegisteredCommand(response.getErrorCode(),
response.hasDatanodeUUID() ? response.getDatanodeUUID(): "",
response.hasClusterID() ? response.getClusterID(): "");
}
/**
* Sets cluster ID.
*
* @param cluster - clusterID
* @return Builder
*/
public Builder setClusterID(String cluster) {
this.clusterID = cluster;
return this;
}
/**
* Sets Error code.
*
* @param errorCode - error code
* @return Builder
*/
public Builder setErrorCode(ErrorCode errorCode) {
this.error = errorCode;
return this;
}
/**
* Build the command object.
*
* @return RegisteredCommand
*/
public RegisteredCommand build() {
if ((this.error == ErrorCode.success) &&
(this.datanodeUUID == null || this.datanodeUUID.isEmpty()) ||
(this.clusterID == null || this.clusterID.isEmpty())) {
throw new IllegalArgumentException("On success, RegisteredCommand " +
"needs datanodeUUID and ClusterID.");
}
return new
RegisteredCommand(this.error, this.datanodeUUID, this.clusterID);
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
import com.google.protobuf.GeneratedMessage;
/**
* A class that acts as the base class to convert between Java and SCM
* commands in protobuf format.
* @param <T>
*/
public abstract class SCMCommand<T extends GeneratedMessage> {
/**
* Returns the type of this command.
* @return Type
*/
abstract Type getType();
/**
* Gets the protobuf message of this object.
* @return A protobuf message.
*/
abstract T getProtoBufMessage();
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
/**
Set of classes that help in protoc conversions.
**/

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
/**
* This is a class that tracks versions of SCM.
*/
public final class VersionInfo {
// We will just be normal and use positive counting numbers for versions.
private final static VersionInfo[] VERSION_INFOS =
{new VersionInfo("First version of SCM", 1)};
private final String description;
private final int version;
/**
* Never created outside this class.
*
* @param description -- description
* @param version -- version number
*/
private VersionInfo(String description, int version) {
this.description = description;
this.version = version;
}
/**
* Returns all versions.
*
* @return Version info array.
*/
public static VersionInfo[] getAllVersions() {
return VERSION_INFOS.clone();
}
/**
* Returns the latest version.
*
* @return versionInfo
*/
public static VersionInfo getLatestVersion() {
return VERSION_INFOS[VERSION_INFOS.length - 1];
}
/**
* Return description.
*
* @return String
*/
public String getDescription() {
return description;
}
/**
* Return the version.
*
* @return int.
*/
public int getVersion() {
return version;
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Command Queue is queue of commands for the datanode.
* <p>
* Node manager, container Manager and key space managers can queue commands for
* datanodes into this queue. These commands will be send in the order in which
* there where queued.
*/
public class CommandQueue {
private final Map<DatanodeID, List<SCMCommand>> commandMap;
private final Lock lock;
private final List<SCMCommand> nullList;
/**
* Constructs a Command Queue.
*/
public CommandQueue() {
commandMap = new HashMap<>();
lock = new ReentrantLock();
nullList = new LinkedList<>();
nullList.add(NullCommand.newBuilder().build());
}
/**
* Returns a list of Commands for the datanode to execute, if we have no
* commands returns a list with Null Command otherwise the current set of
* commands are returned and command map set to empty list again.
*
* @param datanodeID DatanodeID
* @return List of SCM Commands.
*/
@SuppressWarnings("unchecked")
List<SCMCommand> getCommand(final DatanodeID datanodeID) {
lock.lock();
try {
if (commandMap.containsKey(datanodeID)) {
List temp = commandMap.get(datanodeID);
if (temp.size() > 0) {
LinkedList<SCMCommand> emptyList = new LinkedList<>();
commandMap.put(datanodeID, emptyList);
return temp;
}
}
} finally {
lock.unlock();
}
return nullList;
}
/**
* Adds a Command to the SCM Queue to send the command to container.
*
* @param datanodeID DatanodeID
* @param command - Command
*/
void addCommand(final DatanodeID datanodeID, final SCMCommand command) {
lock.lock();
try {
if (commandMap.containsKey(datanodeID)) {
commandMap.get(datanodeID).add(command);
} else {
LinkedList<SCMCommand> newList = new LinkedList<>();
newList.add(command);
commandMap.put(datanodeID, newList);
}
} finally {
lock.unlock();
}
}
}

View File

@ -1,24 +1,24 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with this * or more contributor license agreements. See the NOTICE file
* work for additional information regarding copyright ownership. The ASF * distributed with this work for additional information
* licenses this file to you under the Apache License, Version 2.0 (the * regarding copyright ownership. The ASF licenses this file
* "License"); you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* <p/> * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* <p/> * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * distributed under the License is distributed on an "AS IS" BASIS,
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* License for the specific language governing permissions and limitations under * See the License for the specific language governing permissions and
* the License. * limitations under the License.
*/ */
package org.apache.hadoop.ozone.scm.node; package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.UnresolvedTopologyException;
import java.io.Closeable; import java.io.Closeable;
import java.util.List; import java.util.List;
@ -47,21 +47,6 @@
*/ */
public interface NodeManager extends Closeable, Runnable { public interface NodeManager extends Closeable, Runnable {
/**
* Update the heartbeat timestamp.
*
* @param datanodeID - Name of the datanode that send us heatbeat.
*/
void updateHeartbeat(DatanodeID datanodeID);
/**
* Add a New Datanode to the NodeManager.
*
* @param nodeReg - Datanode ID.
* @throws UnresolvedTopologyException
*/
void registerNode(DatanodeID nodeReg)
throws UnresolvedTopologyException;
/** /**
* Removes a data node from the management of this Node Manager. * Removes a data node from the management of this Node Manager.
@ -73,7 +58,7 @@ void registerNode(DatanodeID nodeReg)
/** /**
* Gets all Live Datanodes that is currently communicating with SCM. * Gets all Live Datanodes that is currently communicating with SCM.
* * @param nodestate - State of the node
* @return List of Datanodes that are Heartbeating SCM. * @return List of Datanodes that are Heartbeating SCM.
*/ */
@ -81,7 +66,7 @@ void registerNode(DatanodeID nodeReg)
/** /**
* Returns the Number of Datanodes that are communicating with SCM. * Returns the Number of Datanodes that are communicating with SCM.
* * @param nodestate - State of the node
* @return int -- count * @return int -- count
*/ */
int getNodeCount(NODESTATE nodestate); int getNodeCount(NODESTATE nodestate);

View File

@ -1,18 +1,19 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with this * or more contributor license agreements. See the NOTICE file
* work for additional information regarding copyright ownership. The ASF * distributed with this work for additional information
* licenses this file to you under the Apache License, Version 2.0 (the * regarding copyright ownership. The ASF licenses this file
* "License"); you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* <p/> * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* <p/> * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * distributed under the License is distributed on an "AS IS" BASIS,
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* License for the specific language governing permissions and limitations under * See the License for the specific language governing permissions and
* the License. * limitations under the License.
*/ */
package org.apache.hadoop.ozone.scm.node; package org.apache.hadoop.ozone.scm.node;
@ -24,6 +25,14 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,6 +43,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -45,38 +55,30 @@
/** /**
* Maintains information about the Datanodes on SCM side. * Maintains information about the Datanodes on SCM side.
* <p/> * <p>
* Heartbeats under SCM is very simple compared to HDFS heartbeatManager. * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
* <p/> * <p>
* Here we maintain 3 maps, and we propagate a node from healthyNodesMap to * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
* staleNodesMap to deadNodesMap. This moving of a node from one map to * staleNodesMap to deadNodesMap. This moving of a node from one map to another
* another is controlled by 4 configuration variables. These variables define * is controlled by 4 configuration variables. These variables define how many
* how many heartbeats must go missing for the node to move from one map to * heartbeats must go missing for the node to move from one map to another.
* another. * <p>
* <p/>
* Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The * Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The
* worker thread wakes up and grabs that heartbeat from the queue. The worker * worker thread wakes up and grabs that heartbeat from the queue. The worker
* thread will lookup the healthynodes map and update the timestamp if the entry * thread will lookup the healthynodes map and update the timestamp if the entry
* is there. if not it will look up stale and deadnodes map. * is there. if not it will look up stale and deadnodes map.
* <p/> * <p>
* * The getNode(byState) functions make copy of node maps and then creates a list
* TODO: Replace with Node Registration code. * based on that. It should be assumed that these get functions always report
* if the node is not found in any of these tables it is treated as new node for * *stale* information. For example, getting the deadNodeCount followed by
* time being and added to the healthy nodes list.
*
* <p/>
*
* The getNode(byState) functions make copy of node maps and then creates a
* list based on that. It should be assumed that these get functions always
* report *stale* information. For example, getting the deadNodeCount
* followed by
* getNodes(DEAD) could very well produce totally different count. Also * getNodes(DEAD) could very well produce totally different count. Also
* getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
* guaranteed to add up to the total nodes that we know off. Please treat all * guaranteed to add up to the total nodes that we know off. Please treat all
* get functions in this file as a snap-shot of information that is * get functions in this file as a snap-shot of information that is inconsistent
* inconsistent as soon as you read it. * as soon as you read it.
*/ */
public class SCMNodeManager implements NodeManager { public class SCMNodeManager
implements NodeManager, StorageContainerNodeProtocol {
@VisibleForTesting @VisibleForTesting
static final Logger LOG = static final Logger LOG =
@ -103,12 +105,15 @@ public class SCMNodeManager implements NodeManager {
private long lastHBProcessedCount; private long lastHBProcessedCount;
private int chillModeNodeCount; private int chillModeNodeCount;
private final int maxHBToProcessPerLoop; private final int maxHBToProcessPerLoop;
private final String clusterID;
private final VersionInfo version;
private Optional<Boolean> inManualChillMode; private Optional<Boolean> inManualChillMode;
private final CommandQueue commandQueue;
/** /**
* Constructs SCM machine Manager. * Constructs SCM machine Manager.
*/ */
public SCMNodeManager(Configuration conf) { public SCMNodeManager(Configuration conf, String clusterID) {
heartbeatQueue = new ConcurrentLinkedQueue<>(); heartbeatQueue = new ConcurrentLinkedQueue<>();
healthyNodes = new ConcurrentHashMap<>(); healthyNodes = new ConcurrentHashMap<>();
deadNodes = new ConcurrentHashMap<>(); deadNodes = new ConcurrentHashMap<>();
@ -119,6 +124,9 @@ public SCMNodeManager(Configuration conf) {
staleNodeCount = new AtomicInteger(0); staleNodeCount = new AtomicInteger(0);
deadNodeCount = new AtomicInteger(0); deadNodeCount = new AtomicInteger(0);
totalNodes = new AtomicInteger(0); totalNodes = new AtomicInteger(0);
this.clusterID = clusterID;
this.version = VersionInfo.getLatestVersion();
commandQueue = new CommandQueue();
// TODO: Support this value as a Percentage of known machines. // TODO: Support this value as a Percentage of known machines.
chillModeNodeCount = 1; chillModeNodeCount = 1;
@ -133,54 +141,13 @@ public SCMNodeManager(Configuration conf) {
executorService = HadoopExecutors.newScheduledThreadPool(1, executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true) new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build()); .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
this.inManualChillMode = Optional.absent(); this.inManualChillMode = Optional.absent();
Preconditions.checkState(heartbeatCheckerIntervalMs > 0); Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
executorService.schedule(this, heartbeatCheckerIntervalMs, executorService.schedule(this, heartbeatCheckerIntervalMs,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} }
/**
* Add a New Datanode to the NodeManager. This function is invoked with
* synchronised(this) being held.
*
* @param nodeReg - node to register
*/
@Override
public void registerNode(DatanodeID nodeReg) {
if (nodes.containsKey(nodeReg.getDatanodeUuid())) {
LOG.error("Datanode is already registered. Datanode: {}",
nodeReg.toString());
return;
}
nodes.put(nodeReg.getDatanodeUuid(), nodeReg);
totalNodes.incrementAndGet();
healthyNodes.put(nodeReg.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
LOG.info("Data node with ID: {} Registered.", nodeReg.getDatanodeUuid());
}
/**
* Register the heartbeat with Machine Manager.
*
* This requires no synchronization since the heartbeat queue is
* ConcurrentLinkedQueue. Hence we don't protect it specifically via a lock.
*
* @param datanodeID - Name of the datanode that send us heartbeat.
*/
@Override
public void updateHeartbeat(DatanodeID datanodeID) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
// This could be a problem in tests, if this function is invoked via
// protobuf, transport layer will guarantee that this is not null.
if (datanodeID != null) {
heartbeatQueue.add(datanodeID);
return;
}
LOG.error("Datanode ID in heartbeat is null");
}
/** /**
* Removes a data node from the management of this Node Manager. * Removes a data node from the management of this Node Manager.
* *
@ -203,7 +170,7 @@ public void removeNode(DatanodeID node) throws UnregisteredNodeException {
*/ */
@Override @Override
public List<DatanodeID> getNodes(NODESTATE nodestate) public List<DatanodeID> getNodes(NODESTATE nodestate)
throws IllegalArgumentException{ throws IllegalArgumentException {
Map<String, Long> set; Map<String, Long> set;
switch (nodestate) { switch (nodestate) {
case HEALTHY: case HEALTHY:
@ -226,7 +193,7 @@ public List<DatanodeID> getNodes(NODESTATE nodestate)
} }
return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
/** /**
@ -241,7 +208,7 @@ public List<DatanodeID> getAllNodes() {
set = Collections.unmodifiableMap(new HashMap<>(nodes)); set = Collections.unmodifiableMap(new HashMap<>(nodes));
} }
return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
/** /**
@ -257,7 +224,7 @@ public int getMinimumChillModeNodes() {
/** /**
* Sets the Minimum chill mode nodes count, used only in testing. * Sets the Minimum chill mode nodes count, used only in testing.
* *
* @param count - Number of nodes. * @param count - Number of nodes.
*/ */
@VisibleForTesting @VisibleForTesting
public void setMinimumChillModeNodes(int count) { public void setMinimumChillModeNodes(int count) {
@ -329,7 +296,7 @@ private String getNodeStatus() {
*/ */
@Override @Override
public boolean isInManualChillMode() { public boolean isInManualChillMode() {
if(this.inManualChillMode.isPresent()) { if (this.inManualChillMode.isPresent()) {
return this.inManualChillMode.get(); return this.inManualChillMode.get();
} }
return false; return false;
@ -373,6 +340,7 @@ public int getNodeCount(NODESTATE nodestate) {
/** /**
* Used for testing. * Used for testing.
*
* @return true if the HB check is done. * @return true if the HB check is done.
*/ */
@VisibleForTesting @VisibleForTesting
@ -383,14 +351,14 @@ public boolean waitForHeartbeatThead() {
/** /**
* This is the real worker thread that processes the HB queue. We do the * This is the real worker thread that processes the HB queue. We do the
* following things in this thread. * following things in this thread.
* * <p>
* Process the Heartbeats that are in the HB Queue. * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node to
* Move Stale or Dead node to healthy if we got a heartbeat from them. * healthy if we got a heartbeat from them. Move Stales Node to dead node
* Move Stales Node to dead node table if it is needed. * table if it is needed. Move healthy nodes to stale nodes if it is needed.
* Move healthy nodes to stale nodes if it is needed. * <p>
*
* if it is a new node, we call register node and add it to the list of nodes. * if it is a new node, we call register node and add it to the list of nodes.
* This will be replaced when we support registration of a node in SCM. * This will be replaced when we support registration of a node in SCM.
*
* @see Thread#run() * @see Thread#run()
*/ */
@Override @Override
@ -553,10 +521,7 @@ private void handleHeartbeat(DatanodeID datanodeID) {
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
deadNodeCount.decrementAndGet(); deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet(); healthyNodeCount.incrementAndGet();
return;
} }
registerNode(datanodeID);
} }
/** /**
@ -587,4 +552,103 @@ long getLastHBProcessedCount() {
return lastHBProcessedCount; return lastHBProcessedCount;
} }
/**
* Gets the version info from SCM.
*
* @param versionRequest - version Request.
* @return - returns SCM version info and other required information needed by
* datanode.
*/
@Override
public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
return VersionResponse.newBuilder()
.setVersion(this.version.getVersion())
.build();
}
/**
* Register the node if the node finds that it is not registered with any
* SCM.
*
* @param datanodeID - Send datanodeID with Node info. This function
* generates and assigns new datanode ID for the datanode.
* This allows SCM to be run independent of Namenode if
* required.
*
* @return SCMHeartbeatResponseProto
*/
@Override
public SCMCommand register(DatanodeID datanodeID) {
SCMCommand errorCode = verifyDatanodeUUID(datanodeID);
if (errorCode != null) {
return errorCode;
}
DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(),
datanodeID);
nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID);
totalNodes.incrementAndGet();
healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
LOG.info("Data node with ID: {} Registered.",
newDatanodeID.getDatanodeUuid());
return RegisteredCommand.newBuilder()
.setErrorCode(ErrorCode.success)
.setDatanodeUUID(newDatanodeID.getDatanodeUuid())
.setClusterID(this.clusterID)
.build();
}
/**
* Verifies the datanode does not have a valid UUID already.
*
* @param datanodeID - Datanode UUID.
* @return SCMCommand
*/
private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
// Make sure that we return the right error code, so that
// data node can log the correct error. if it is already registered then
// datanode should move to heartbeat state. It implies that somehow we
// have an error where the data node is trying to re-register.
//
// We are going to let the datanode know that there is an error but allow it
// to recover by sending it the right info that is needed for recovery.
if (datanodeID.getDatanodeUuid() != null &&
nodes.containsKey(datanodeID.getDatanodeUuid())) {
LOG.error("Datanode is already registered. Datanode: {}",
datanodeID.toString());
return RegisteredCommand.newBuilder()
.setErrorCode(ErrorCode.errorNodeAlreadyRegistered)
.setClusterID(this.clusterID)
.setDatanodeUUID(datanodeID.getDatanodeUuid())
.build();
}
return null;
}
/**
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
* @return SCMheartbeat response.
* @throws IOException
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
// This could be a problem in tests, if this function is invoked via
// protobuf, transport layer will guarantee that this is not null.
if (datanodeID != null) {
heartbeatQueue.add(datanodeID);
} else {
LOG.error("Datanode ID in heartbeat is null");
}
return commandQueue.getCommand(datanodeID);
}
} }

View File

@ -67,26 +67,45 @@ message SCMVersionResponseProto {
* registeration of a datanode. * registeration of a datanode.
*/ */
message RegisteredCmdResponseProto { message RegisteredCmdResponseProto {
required string datanodeUUID = 1; enum ErrorCode {
required string clusterID = 2; success = 1;
errorNodeAlreadyRegistered = 2;
errorNodeNotPermitted = 3;
}
required ErrorCode errorCode = 1;
optional string datanodeUUID = 2;
optional string clusterID = 3;
}
/**
* Empty Command Response
*/
message NullCmdResponseProto {
} }
/* /*
* These are commands returned by SCM for to the datanode to execute. * These are commands returned by SCM for to the datanode to execute.
*/ */
message SCMCommandResponseProto {
message SCMHeartbeatResponseProto {
enum Type { enum Type {
nullCmd = 1; nullCmd = 1;
registeredCmd = 2; // Returns the datanode ID after registeration. registeredCmd = 2; // Returns the datanode ID after registeration.
} }
required Type cmdType = 1; // Type of the command required Type cmdType = 1; // Type of the command
optional RegisteredCmdResponseProto registerNode = 2; optional NullCmdResponseProto nullCommand = 2;
optional RegisteredCmdResponseProto registerNode = 3;
} }
/*
* A group of commands for the datanode to execute
*/
message SCMHeartbeatResponseProto {
repeated SCMCommandResponseProto commands = 1;
}
/** /**
* Protocol used from a datanode to StorageContainerManager. * Protocol used from a datanode to StorageContainerManager.
@ -151,8 +170,6 @@ message SCMHeartbeatResponseProto {
* Once in the heartbeat state, datanode sends heartbeats and container reports * Once in the heartbeat state, datanode sends heartbeats and container reports
* to SCM and process commands issued by SCM until it is shutdown. * to SCM and process commands issued by SCM until it is shutdown.
* *
* For time being we are going to use SCMHeartbeatResponseProto as the return
* type for register and sendheartbeat messages.
*/ */
service StorageContainerDatanodeProtocolService { service StorageContainerDatanodeProtocolService {
@ -164,13 +181,12 @@ service StorageContainerDatanodeProtocolService {
/** /**
* Registers a data node with SCM. * Registers a data node with SCM.
*/ */
rpc register(SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto);
/** /**
* Send heartbeat from datanode to SCM. HB's under SCM looks more * Send heartbeat from datanode to SCM. HB's under SCM looks more
* like life line protocol than HB's under HDFS. In other words, it is * like life line protocol than HB's under HDFS. In other words, it is
* extremely light weight and contains no data payload. * extremely light weight and contains no data payload.
*/ */
rpc sendHeartbeat(SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
} }

View File

@ -1,26 +1,27 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with this * or more contributor license agreements. See the NOTICE file
* work for additional information regarding copyright ownership. The ASF * distributed with this work for additional information
* licenses this file to you under the Apache License, Version 2.0 (the * regarding copyright ownership. The ASF licenses this file
* "License"); you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* <p> * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* <p> * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * distributed under the License is distributed on an "AS IS" BASIS,
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* License for the specific language governing permissions and limitations under * See the License for the specific language governing permissions and
* the License. * limitations under the License.
*/ */
package org.apache.hadoop.ozone.scm.node; package org.apache.hadoop.ozone.scm.node;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.junit.Assert; import org.junit.Assert;
@ -76,8 +77,9 @@ Configuration getConf() {
* *
* @return DatanodeID * @return DatanodeID
*/ */
DatanodeID getDatanodeID() { DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
return getDatanodeID(UUID.randomUUID().toString());
return getDatanodeID(nodeManager, UUID.randomUUID().toString());
} }
/** /**
@ -86,16 +88,19 @@ DatanodeID getDatanodeID() {
* @param uuid - node ID, it is generally UUID. * @param uuid - node ID, it is generally UUID.
* @return DatanodeID. * @return DatanodeID.
*/ */
DatanodeID getDatanodeID(String uuid) { DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) {
Random random = new Random(); Random random = new Random();
String ipAddress = random.nextInt(256) + "." String ipAddress = random.nextInt(256) + "."
+ random.nextInt(256) + "." + random.nextInt(256) + "."
+ random.nextInt(256) + "." + random.nextInt(256) + "."
+ random.nextInt(256); + random.nextInt(256);
String hostName = RandomStringUtils.randomAscii(8); String hostName = uuid;
return new DatanodeID(ipAddress, hostName, uuid, DatanodeID tempDataNode = new DatanodeID(ipAddress,
0, 0, 0, 0); hostName, uuid, 0, 0, 0, 0);
RegisteredCommand command =
(RegisteredCommand) nodeManager.register(tempDataNode);
return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
} }
/** /**
@ -107,7 +112,8 @@ DatanodeID getDatanodeID(String uuid) {
*/ */
SCMNodeManager createNodeManager(Configuration config) throws IOException { SCMNodeManager createNodeManager(Configuration config) throws IOException {
SCMNodeManager nodeManager = new SCMNodeManager(config); SCMNodeManager nodeManager = new SCMNodeManager(config,
UUID.randomUUID().toString());
assertFalse("Node manager should be in chill mode", assertFalse("Node manager should be in chill mode",
nodeManager.isOutOfNodeChillMode()); nodeManager.isOutOfNodeChillMode());
return nodeManager; return nodeManager;
@ -126,10 +132,10 @@ public void testScmHeartbeat() throws IOException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
// Send some heartbeats from different nodes. // Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
nodeManager.updateHeartbeat(getDatanodeID()); DatanodeID datanodeID = getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
} }
// Wait for 4 seconds max. // Wait for 4 seconds max.
@ -175,7 +181,7 @@ public void testScmNotEnoughHeartbeats() throws IOException,
// Need 100 nodes to come out of chill mode, only one node is sending HB. // Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100); nodeManager.setMinimumChillModeNodes(100);
nodeManager.updateHeartbeat(getDatanodeID()); nodeManager.sendHeartbeat(getDatanodeID(nodeManager));
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000); 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " + assertFalse("Not enough heartbeat, Node manager should have been in " +
@ -197,11 +203,11 @@ public void testScmSameNodeHeartbeats() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.setMinimumChillModeNodes(3); nodeManager.setMinimumChillModeNodes(3);
DatanodeID datanodeID = getDatanodeID(); DatanodeID datanodeID = getDatanodeID(nodeManager);
// Send 10 heartbeat from same node, and assert we never leave chill mode. // Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) { for (int x = 0; x < 10; x++) {
nodeManager.updateHeartbeat(datanodeID); nodeManager.sendHeartbeat(datanodeID);
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@ -226,17 +232,15 @@ public void testScmShutdown() throws IOException, InterruptedException,
Configuration conf = getConf(); Configuration conf = getConf();
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
SCMNodeManager nodeManager = createNodeManager(conf); SCMNodeManager nodeManager = createNodeManager(conf);
DatanodeID datanodeID = getDatanodeID(nodeManager);
nodeManager.close(); nodeManager.close();
// These should never be processed. // These should never be processed.
nodeManager.updateHeartbeat(getDatanodeID()); nodeManager.sendHeartbeat(datanodeID);
// Let us just wait for 2 seconds to prove that HBs are not processed. // Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
assertFalse("Node manager executor service is shutdown, should never exit" +
" chill mode", nodeManager.isOutOfNodeChillMode());
assertEquals("Assert new HBs were never processed", 0, assertEquals("Assert new HBs were never processed", 0,
nodeManager.getLastHBProcessedCount()); nodeManager.getLastHBProcessedCount());
} }
@ -256,8 +260,10 @@ public void testScmHealthyNodeCount() throws IOException,
final int count = 10; final int count = 10;
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < count; x++) { for (int x = 0; x < count; x++) {
nodeManager.updateHeartbeat(getDatanodeID()); DatanodeID datanodeID = getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
} }
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000); 4 * 1000);
@ -338,20 +344,19 @@ public void testScmDetectStaleNode() throws IOException,
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> nodeList = new LinkedList<>(); List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
DatanodeID staleNode = getDatanodeID(); "staleNode");
for (int x = 0; x < nodeCount; x++) { DatanodeID staleNode = getDatanodeID(nodeManager);
nodeList.add(getDatanodeID());
}
// Heartbeat once // Heartbeat once
nodeManager.updateHeartbeat(staleNode); nodeManager.sendHeartbeat(staleNode);
// Heartbeat all other nodes. // Heartbeat all other nodes.
nodeList.forEach(nodeManager::updateHeartbeat); nodeList.forEach(nodeManager::sendHeartbeat);
// Wait for 2 seconds .. and heartbeat good nodes again. // Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::updateHeartbeat); nodeList.forEach(nodeManager::sendHeartbeat);
// Wait for 2 more seconds, 3 seconds is the stale window for this test // Wait for 2 more seconds, 3 seconds is the stale window for this test
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
@ -388,36 +393,34 @@ public void testScmDetectDeadNode() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> nodeList = new LinkedList<>(); List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
"Node");
DatanodeID deadNode = getDatanodeID(nodeManager);
DatanodeID deadNode = getDatanodeID();
for (int x = 0; x < nodeCount; x++) {
nodeList.add(getDatanodeID());
}
// Heartbeat once // Heartbeat once
nodeManager.updateHeartbeat(deadNode); nodeManager.sendHeartbeat(deadNode);
// Heartbeat all other nodes. // Heartbeat all other nodes.
nodeList.forEach(nodeManager::updateHeartbeat); nodeList.forEach(nodeManager::sendHeartbeat);
// Wait for 2 seconds .. and heartbeat good nodes again. // Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::updateHeartbeat); nodeList.forEach(nodeManager::sendHeartbeat);
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
// heartbeat good nodes again. // heartbeat good nodes again.
nodeList.forEach(nodeManager::updateHeartbeat); nodeList.forEach(nodeManager::sendHeartbeat);
// 6 seconds is the dead window for this test , so we wait a total of // 6 seconds is the dead window for this test , so we wait a total of
// 7 seconds to make sure that the node moves into dead state. // 7 seconds to make sure that the node moves into dead state.
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
// Check for the dead node now. // Check for the dead node now.
List<DatanodeID> deadNodeList = nodeManager List<DatanodeID> deadNodeList = nodeManager.getNodes(DEAD);
.getNodes(DEAD); assertEquals("Expected to find 1 dead node", 1,
assertEquals("Expected to find 1 dead node", 1, nodeManager nodeManager.getNodeCount(DEAD));
.getNodeCount(DEAD));
assertEquals("Expected to find 1 dead node", 1, deadNodeList.size()); assertEquals("Expected to find 1 dead node", 1, deadNodeList.size());
assertEquals("Dead node is not the expected ID", deadNode assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid()); .getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
@ -438,9 +441,8 @@ public void testScmDuplicateRegistrationLogsError() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
DatanodeID duplicateNodeID = getDatanodeID(); DatanodeID duplicateNodeID = getDatanodeID(nodeManager);
nodeManager.registerNode(duplicateNodeID); nodeManager.register(duplicateNodeID);
nodeManager.registerNode(duplicateNodeID);
logCapturer.stopCapturing(); logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode is already" + assertThat(logCapturer.getOutput(), containsString("Datanode is already" +
" registered.")); " registered."));
@ -460,7 +462,7 @@ public void testScmLogErrorOnNullDatanode() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(getConf())) { try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
nodeManager.updateHeartbeat(null); nodeManager.sendHeartbeat(null);
logCapturer.stopCapturing(); logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode ID in " + assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
"heartbeat is null")); "heartbeat is null"));
@ -486,11 +488,6 @@ public void testScmLogErrorOnNullDatanode() throws IOException,
@Test @Test
public void testScmClusterIsInExpectedState1() throws IOException, public void testScmClusterIsInExpectedState1() throws IOException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
DatanodeID healthyNode = getDatanodeID("HealthyNode");
DatanodeID staleNode = getDatanodeID("StaleNode");
DatanodeID deadNode = getDatanodeID("DeadNode");
/** /**
* These values are very important. Here is what it means so you don't * These values are very important. Here is what it means so you don't
* have to look it up while reading this code. * have to look it up while reading this code.
@ -535,9 +532,12 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* Cluster state: Healthy: All nodes are heartbeat-ing like normal. * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
*/ */
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.updateHeartbeat(healthyNode); DatanodeID healthyNode = getDatanodeID(nodeManager, "HealthyNode");
nodeManager.updateHeartbeat(staleNode); DatanodeID staleNode = getDatanodeID(nodeManager, "StaleNode");
nodeManager.updateHeartbeat(deadNode); DatanodeID deadNode = getDatanodeID(nodeManager, "DeadNode");
nodeManager.sendHeartbeat(healthyNode);
nodeManager.sendHeartbeat(staleNode);
nodeManager.sendHeartbeat(deadNode);
// Sleep so that heartbeat processing thread gets to run. // Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500); Thread.sleep(500);
@ -563,12 +563,12 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* the 3 second windows. * the 3 second windows.
*/ */
nodeManager.updateHeartbeat(healthyNode); nodeManager.sendHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode); nodeManager.sendHeartbeat(staleNode);
nodeManager.updateHeartbeat(deadNode); nodeManager.sendHeartbeat(deadNode);
Thread.sleep(1500); Thread.sleep(1500);
nodeManager.updateHeartbeat(healthyNode); nodeManager.sendHeartbeat(healthyNode);
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY)); assertEquals(1, nodeManager.getNodeCount(HEALTHY));
@ -588,10 +588,10 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* staleNode to move to stale state and deadNode to move to dead state. * staleNode to move to stale state and deadNode to move to dead state.
*/ */
nodeManager.updateHeartbeat(healthyNode); nodeManager.sendHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode); nodeManager.sendHeartbeat(staleNode);
Thread.sleep(1500); Thread.sleep(1500);
nodeManager.updateHeartbeat(healthyNode); nodeManager.sendHeartbeat(healthyNode);
Thread.sleep(2 * 1000); Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale. // 3.5 seconds have elapsed for stale node, so it moves into Stale.
@ -617,14 +617,13 @@ public void testScmClusterIsInExpectedState1() throws IOException,
assertEquals("Expected one dead node", 1, deadList.size()); assertEquals("Expected one dead node", 1, deadList.size());
assertEquals("Dead node is not the expected ID", deadNode assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadList.get(0).getDatanodeUuid()); .getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
/** /**
* Cluster State : let us heartbeat all the nodes and verify that we get * Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state. * back all the nodes in healthy state.
*/ */
nodeManager.updateHeartbeat(healthyNode); nodeManager.sendHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode); nodeManager.sendHeartbeat(staleNode);
nodeManager.updateHeartbeat(deadNode); nodeManager.sendHeartbeat(deadNode);
Thread.sleep(500); Thread.sleep(500);
//Assert all nodes are healthy. //Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size()); assertEquals(3, nodeManager.getAllNodes().size());
@ -640,10 +639,10 @@ public void testScmClusterIsInExpectedState1() throws IOException,
* @param sleepDuration - Duration to sleep between heartbeats. * @param sleepDuration - Duration to sleep between heartbeats.
* @throws InterruptedException * @throws InterruptedException
*/ */
private void heartbeatNodeSet(NodeManager manager, List<DatanodeID> list, private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
int sleepDuration) throws InterruptedException { int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
list.forEach(manager::updateHeartbeat); list.forEach(manager::sendHeartbeat);
Thread.sleep(sleepDuration); Thread.sleep(sleepDuration);
} }
} }
@ -655,10 +654,12 @@ private void heartbeatNodeSet(NodeManager manager, List<DatanodeID> list,
* @param prefix - A prefix string that can be used in verification. * @param prefix - A prefix string that can be used in verification.
* @return List of Nodes. * @return List of Nodes.
*/ */
private List<DatanodeID> createNodeSet(int count, String prefix) { private List<DatanodeID> createNodeSet(SCMNodeManager nodeManager, int
count, String
prefix) {
List<DatanodeID> list = new LinkedList<>(); List<DatanodeID> list = new LinkedList<>();
for (int x = 0; x < count; x++) { for (int x = 0; x < count; x++) {
list.add(getDatanodeID(prefix + x)); list.add(getDatanodeID(nodeManager, prefix + x));
} }
return list; return list;
} }
@ -696,12 +697,16 @@ public void testScmClusterIsInExpectedState2() throws IOException,
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000); conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
List<DatanodeID> healthyNodeList = createNodeSet(healthyCount, "Healthy");
List<DatanodeID> staleNodeList = createNodeSet(staleCount, "Stale");
List<DatanodeID> deadNodeList = createNodeSet(deadCount, "Dead");
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> healthyNodeList = createNodeSet(nodeManager,
healthyCount, "Healthy");
List<DatanodeID> staleNodeList = createNodeSet(nodeManager, staleCount,
"Stale");
List<DatanodeID> deadNodeList = createNodeSet(nodeManager, deadCount,
"Dead");
Runnable healthyNodeTask = () -> { Runnable healthyNodeTask = () -> {
try { try {
// 2 second heartbeat makes these nodes stay healthy. // 2 second heartbeat makes these nodes stay healthy.
@ -722,7 +727,7 @@ public void testScmClusterIsInExpectedState2() throws IOException,
// No Thread just one time HBs the node manager, so that these will be // No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually. // marked as dead nodes eventually.
deadNodeList.forEach(nodeManager::updateHeartbeat); deadNodeList.forEach(nodeManager::sendHeartbeat);
Thread thread1 = new Thread(healthyNodeTask); Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true); thread1.setDaemon(true);
@ -745,7 +750,7 @@ public void testScmClusterIsInExpectedState2() throws IOException,
List<DatanodeID> deadList = nodeManager.getNodes(DEAD); List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
for (DatanodeID node : deadList) { for (DatanodeID node : deadList) {
assertThat(node.getDatanodeUuid(), CoreMatchers.startsWith("Dead")); assertThat(node.getHostName(), CoreMatchers.startsWith("Dead"));
} }
// Checking stale nodes is tricky since they have to move between // Checking stale nodes is tricky since they have to move between
@ -772,8 +777,6 @@ public void testScmCanHandleScale() throws IOException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
final int healthyCount = 3000; final int healthyCount = 3000;
final int staleCount = 3000; final int staleCount = 3000;
List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
List<DatanodeID> staleList = createNodeSet(staleCount, "s");
Configuration conf = getConf(); Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
@ -781,6 +784,10 @@ public void testScmCanHandleScale() throws IOException,
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> healthyList = createNodeSet(nodeManager,
healthyCount, "h");
List<DatanodeID> staleList = createNodeSet(nodeManager, staleCount, "s");
Runnable healthyNodeTask = () -> { Runnable healthyNodeTask = () -> {
try { try {
heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
@ -800,7 +807,6 @@ public void testScmCanHandleScale() throws IOException,
thread1.setDaemon(true); thread1.setDaemon(true);
thread1.start(); thread1.start();
Thread thread2 = new Thread(staleNodeTask); Thread thread2 = new Thread(staleNodeTask);
thread2.setDaemon(true); thread2.setDaemon(true);
thread2.start(); thread2.start();
@ -829,7 +835,6 @@ public void testScmCanHandleScale() throws IOException,
public void testScmLogsHeartbeatFlooding() throws IOException, public void testScmLogsHeartbeatFlooding() throws IOException,
InterruptedException { InterruptedException {
final int healthyCount = 3000; final int healthyCount = 3000;
List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
// Make the HB process thread run slower. // Make the HB process thread run slower.
Configuration conf = getConf(); Configuration conf = getConf();
@ -838,6 +843,8 @@ public void testScmLogsHeartbeatFlooding() throws IOException,
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500); conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> healthyList = createNodeSet(nodeManager, healthyCount,
"h");
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
Runnable healthyNodeTask = () -> { Runnable healthyNodeTask = () -> {
@ -871,7 +878,8 @@ public void testScmEnterAndExistChillMode() throws IOException,
try (SCMNodeManager nodeManager = createNodeManager(conf)) { try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.setMinimumChillModeNodes(10); nodeManager.setMinimumChillModeNodes(10);
nodeManager.updateHeartbeat(getDatanodeID()); DatanodeID datanodeID = getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
String status = nodeManager.getChillModeStatus(); String status = nodeManager.getChillModeStatus();
Assert.assertThat(status, CoreMatchers.containsString("Still in chill " + Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
"mode. Waiting on nodes to report in.")); "mode. Waiting on nodes to report in."));
@ -900,7 +908,8 @@ public void testScmEnterAndExistChillMode() throws IOException,
// Assert that node manager force enter cannot be overridden by nodes HBs. // Assert that node manager force enter cannot be overridden by nodes HBs.
for(int x= 0; x < 20; x++) { for(int x= 0; x < 20; x++) {
nodeManager.updateHeartbeat(getDatanodeID()); DatanodeID datanode = getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanode);
} }
Thread.sleep(500); Thread.sleep(500);