HDFS-13348. Ozone: Update IP and hostname in Datanode from SCM's response to the register call. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
12d80f8cae
commit
aae3ba24ca
@ -20,9 +20,10 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -40,7 +41,7 @@ public final class RegisterEndpointTask implements
|
||||
private final EndpointStateMachine rpcEndPoint;
|
||||
private final Configuration conf;
|
||||
private Future<EndpointStateMachine.EndPointStates> result;
|
||||
private DatanodeDetailsProto datanodeDetailsProto;
|
||||
private DatanodeDetails datanodeDetails;
|
||||
|
||||
/**
|
||||
* Creates a register endpoint task.
|
||||
@ -57,22 +58,22 @@ public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the DatanodeDetailsProto Proto.
|
||||
* Get the DatanodeDetails.
|
||||
*
|
||||
* @return DatanodeDetailsProto
|
||||
*/
|
||||
public DatanodeDetailsProto getDatanodeDetailsProto() {
|
||||
return datanodeDetailsProto;
|
||||
public DatanodeDetails getDatanodeDetails() {
|
||||
return datanodeDetails;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the contiainerNodeID Proto.
|
||||
*
|
||||
* @param datanodeDetailsProto - Container Node ID.
|
||||
* @param datanodeDetails - Container Node ID.
|
||||
*/
|
||||
public void setDatanodeDetailsProto(
|
||||
DatanodeDetailsProto datanodeDetailsProto) {
|
||||
this.datanodeDetailsProto = datanodeDetailsProto;
|
||||
public void setDatanodeDetails(
|
||||
DatanodeDetails datanodeDetails) {
|
||||
this.datanodeDetails = datanodeDetails;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,8 +85,8 @@ public void setDatanodeDetailsProto(
|
||||
@Override
|
||||
public EndpointStateMachine.EndPointStates call() throws Exception {
|
||||
|
||||
if (getDatanodeDetailsProto() == null) {
|
||||
LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " +
|
||||
if (getDatanodeDetails() == null) {
|
||||
LOG.error("DatanodeDetails cannot be null in RegisterEndpoint task, " +
|
||||
"shutting down the endpoint.");
|
||||
return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
|
||||
}
|
||||
@ -94,8 +95,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
|
||||
try {
|
||||
|
||||
// TODO : Add responses to the command Queue.
|
||||
rpcEndPoint.getEndPoint().register(datanodeDetailsProto,
|
||||
SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint()
|
||||
.register(datanodeDetails.getProtoBufMessage(),
|
||||
conf.getStrings(ScmConfigKeys.OZONE_SCM_NAMES));
|
||||
if (response.hasHostname() && response.hasIpAddress()) {
|
||||
datanodeDetails.setHostName(response.getHostname());
|
||||
datanodeDetails.setIpAddress(response.getIpAddress());
|
||||
}
|
||||
EndpointStateMachine.EndPointStates nextState =
|
||||
rpcEndPoint.getState().getNextState();
|
||||
rpcEndPoint.setState(nextState);
|
||||
@ -187,7 +193,7 @@ public RegisterEndpointTask build() {
|
||||
|
||||
RegisterEndpointTask task = new RegisterEndpointTask(this
|
||||
.endPointStateMachine, this.conf);
|
||||
task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
|
||||
task.setDatanodeDetails(datanodeDetails);
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
@ -34,12 +34,20 @@ public class RegisteredCommand extends
|
||||
private String datanodeUUID;
|
||||
private String clusterID;
|
||||
private ErrorCode error;
|
||||
private String hostname;
|
||||
private String ipAddress;
|
||||
|
||||
public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
|
||||
final String clusterID) {
|
||||
this(error, datanodeUUID, clusterID, null, null);
|
||||
}
|
||||
public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
|
||||
final String clusterID, final String hostname, final String ipAddress) {
|
||||
this.datanodeUUID = datanodeUUID;
|
||||
this.clusterID = clusterID;
|
||||
this.error = error;
|
||||
this.hostname = hostname;
|
||||
this.ipAddress = ipAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -88,6 +96,22 @@ public ErrorCode getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the hostname.
|
||||
*
|
||||
* @return - hostname
|
||||
*/
|
||||
public String getHostName() {
|
||||
return hostname;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ipAddress of the dataNode.
|
||||
*/
|
||||
public String getIpAddress() {
|
||||
return ipAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the protobuf message of this object.
|
||||
*
|
||||
@ -95,11 +119,15 @@ public ErrorCode getError() {
|
||||
*/
|
||||
@Override
|
||||
public byte[] getProtoBufMessage() {
|
||||
return SCMRegisteredCmdResponseProto.newBuilder()
|
||||
SCMRegisteredCmdResponseProto.Builder builder =
|
||||
SCMRegisteredCmdResponseProto.newBuilder()
|
||||
.setClusterID(this.clusterID)
|
||||
.setDatanodeUUID(this.datanodeUUID)
|
||||
.setErrorCode(this.error)
|
||||
.build().toByteArray();
|
||||
.setErrorCode(this.error);
|
||||
if (hostname != null && ipAddress != null) {
|
||||
builder.setHostname(hostname).setIpAddress(ipAddress);
|
||||
}
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -109,6 +137,8 @@ public static class Builder {
|
||||
private String datanodeUUID;
|
||||
private String clusterID;
|
||||
private ErrorCode error;
|
||||
private String ipAddress;
|
||||
private String hostname;
|
||||
|
||||
/**
|
||||
* sets UUID.
|
||||
@ -130,10 +160,17 @@ public Builder setDatanodeUUID(String dnUUID) {
|
||||
public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto
|
||||
response) {
|
||||
Preconditions.checkNotNull(response);
|
||||
if (response.hasHostname() && response.hasIpAddress()) {
|
||||
return new RegisteredCommand(response.getErrorCode(),
|
||||
response.hasDatanodeUUID() ? response.getDatanodeUUID() : "",
|
||||
response.hasClusterID() ? response.getClusterID() : "",
|
||||
response.getHostname(), response.getIpAddress());
|
||||
} else {
|
||||
return new RegisteredCommand(response.getErrorCode(),
|
||||
response.hasDatanodeUUID() ? response.getDatanodeUUID() : "",
|
||||
response.hasClusterID() ? response.getClusterID() : "");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets cluster ID.
|
||||
@ -157,21 +194,38 @@ public Builder setErrorCode(ErrorCode errorCode) {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* sets the hostname.
|
||||
*/
|
||||
public Builder setHostname(String hostname) {
|
||||
this.hostname = hostname;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setIpAddress(String ipAddress) {
|
||||
this.ipAddress = ipAddress;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the command object.
|
||||
*
|
||||
* @return RegisteredCommand
|
||||
*/
|
||||
public RegisteredCommand build() {
|
||||
if ((this.error == ErrorCode.success) &&
|
||||
(this.datanodeUUID == null || this.datanodeUUID.isEmpty()) ||
|
||||
(this.clusterID == null || this.clusterID.isEmpty())) {
|
||||
throw new IllegalArgumentException("On success, RegisteredCommand " +
|
||||
"needs datanodeUUID and ClusterID.");
|
||||
if ((this.error == ErrorCode.success) && (this.datanodeUUID == null
|
||||
|| this.datanodeUUID.isEmpty()) || (this.clusterID == null
|
||||
|| this.clusterID.isEmpty())) {
|
||||
throw new IllegalArgumentException("On success, RegisteredCommand "
|
||||
+ "needs datanodeUUID and ClusterID.");
|
||||
}
|
||||
if (hostname != null && ipAddress != null) {
|
||||
return new RegisteredCommand(this.error, this.datanodeUUID,
|
||||
this.clusterID, this.hostname, this.ipAddress);
|
||||
} else {
|
||||
return new RegisteredCommand(this.error, this.datanodeUUID,
|
||||
this.clusterID);
|
||||
}
|
||||
|
||||
return new
|
||||
RegisteredCommand(this.error, this.datanodeUUID, this.clusterID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -181,6 +181,8 @@ message SCMRegisteredCmdResponseProto {
|
||||
optional string datanodeUUID = 3;
|
||||
optional string clusterID = 4;
|
||||
optional SCMNodeAddressList addressList = 5;
|
||||
optional string hostname = 6;
|
||||
optional string ipAddress = 7;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -745,13 +745,15 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
|
||||
@Override
|
||||
public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) {
|
||||
|
||||
String hostname = null;
|
||||
String ip = null;
|
||||
DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
|
||||
datanodeDetailsProto);
|
||||
InetAddress dnAddress = Server.getRemoteIp();
|
||||
if (dnAddress != null) {
|
||||
// Mostly called inside an RPC, update ip and peer hostname
|
||||
String hostname = dnAddress.getHostName();
|
||||
String ip = dnAddress.getHostAddress();
|
||||
hostname = dnAddress.getHostName();
|
||||
ip = dnAddress.getHostAddress();
|
||||
datanodeDetails.setHostName(hostname);
|
||||
datanodeDetails.setIpAddress(ip);
|
||||
}
|
||||
@ -788,11 +790,14 @@ public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) {
|
||||
}
|
||||
LOG.info("Data node with ID: {} Registered.",
|
||||
datanodeDetails.getUuid());
|
||||
return RegisteredCommand.newBuilder()
|
||||
.setErrorCode(ErrorCode.success)
|
||||
RegisteredCommand.Builder builder =
|
||||
RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
|
||||
.setDatanodeUUID(datanodeDetails.getUuidString())
|
||||
.setClusterID(this.clusterID)
|
||||
.build();
|
||||
.setClusterID(this.clusterID);
|
||||
if (hostname != null && ip != null) {
|
||||
builder.setHostname(hostname).setIpAddress(ip);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -228,9 +228,7 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
|
||||
new RegisterEndpointTask(rpcEndPoint, conf);
|
||||
if (!clearDatanodeDetails) {
|
||||
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
|
||||
HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
|
||||
datanodeDetails.getProtoBufMessage();
|
||||
endpointTask.setDatanodeDetailsProto(datanodeDetailsProto);
|
||||
endpointTask.setDatanodeDetails(datanodeDetails);
|
||||
}
|
||||
endpointTask.call();
|
||||
return rpcEndPoint;
|
||||
|
Loading…
Reference in New Issue
Block a user