HDDS-363. Faster datanode registration during the first startup. Contributed by Elek, Marton.

This commit is contained in:
Márton Elek 2018-08-24 15:36:10 +02:00
parent 55b6931059
commit 138b0c1443
5 changed files with 47 additions and 10 deletions

View File

@ -48,8 +48,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
/** /**
* State Machine Class. * State Machine Class.
*/ */
@ -60,7 +58,6 @@ public class DatanodeStateMachine implements Closeable {
private final ExecutorService executorService; private final ExecutorService executorService;
private final Configuration conf; private final Configuration conf;
private final SCMConnectionManager connectionManager; private final SCMConnectionManager connectionManager;
private final long heartbeatFrequency;
private StateContext context; private StateContext context;
private final OzoneContainer container; private final OzoneContainer container;
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
@ -86,7 +83,6 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.setNameFormat("Datanode State Machine Thread - %d").build()); .setNameFormat("Datanode State Machine Thread - %d").build());
connectionManager = new SCMConnectionManager(conf); connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this); context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
heartbeatFrequency = getScmHeartbeatInterval(conf);
container = new OzoneContainer(this.datanodeDetails, container = new OzoneContainer(this.datanodeDetails,
new OzoneConfiguration(conf), context); new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow()); nextHB = new AtomicLong(Time.monotonicNow());
@ -147,6 +143,7 @@ private void start() throws IOException {
while (context.getState() != DatanodeStates.SHUTDOWN) { while (context.getState() != DatanodeStates.SHUTDOWN) {
try { try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
long heartbeatFrequency = context.getHeartbeatFrequency();
nextHB.set(Time.monotonicNow() + heartbeatFrequency); nextHB.set(Time.monotonicNow() + heartbeatFrequency);
context.execute(executorService, heartbeatFrequency, context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.ozone.protocol.commands.CommandStatus import org.apache.hadoop.ozone.protocol.commands.CommandStatus
.CommandStatusBuilder; .CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -66,6 +68,13 @@ public class StateContext {
private final Queue<ContainerAction> containerActions; private final Queue<ContainerAction> containerActions;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;
/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
* real HB frequency after scm registration. With this method the
* initial registration could be significant faster.
*/
private AtomicLong heartbeatFrequency = new AtomicLong(2000);
/** /**
* Constructs a StateContext. * Constructs a StateContext.
* *
@ -398,4 +407,15 @@ public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
} }
return false; return false;
} }
public void configureHeartbeatFrequency(){
heartbeatFrequency.set(getScmHeartbeatInterval(conf));
}
/**
* Return current heartbeat frequency in ms.
*/
public long getHeartbeatFrequency() {
return heartbeatFrequency.get();
}
} }

View File

@ -101,6 +101,7 @@ public void execute(ExecutorService executor) {
return RegisterEndpointTask.newBuilder() return RegisterEndpointTask.newBuilder()
.setConfig(conf) .setConfig(conf)
.setEndpointStateMachine(endpoint) .setEndpointStateMachine(endpoint)
.setContext(context)
.setDatanodeDetails(context.getParent().getDatanodeDetails()) .setDatanodeDetails(context.getParent().getDatanodeDetails())
.setOzoneContainer(context.getParent().getContainer()) .setOzoneContainer(context.getParent().getContainer())
.build(); .build();

View File

@ -29,6 +29,7 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,6 +51,7 @@ public final class RegisterEndpointTask implements
private Future<EndpointStateMachine.EndPointStates> result; private Future<EndpointStateMachine.EndPointStates> result;
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
private final OzoneContainer datanodeContainerManager; private final OzoneContainer datanodeContainerManager;
private StateContext stateContext;
/** /**
* Creates a register endpoint task. * Creates a register endpoint task.
@ -60,10 +62,12 @@ public final class RegisterEndpointTask implements
*/ */
@VisibleForTesting @VisibleForTesting
public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
Configuration conf, OzoneContainer ozoneContainer) { Configuration conf, OzoneContainer ozoneContainer,
StateContext context) {
this.rpcEndPoint = rpcEndPoint; this.rpcEndPoint = rpcEndPoint;
this.conf = conf; this.conf = conf;
this.datanodeContainerManager = ozoneContainer; this.datanodeContainerManager = ozoneContainer;
this.stateContext = context;
} }
@ -124,6 +128,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndPoint.getState().getNextState(); rpcEndPoint.getState().getNextState();
rpcEndPoint.setState(nextState); rpcEndPoint.setState(nextState);
rpcEndPoint.zeroMissedCount(); rpcEndPoint.zeroMissedCount();
this.stateContext.configureHeartbeatFrequency();
} catch (IOException ex) { } catch (IOException ex) {
rpcEndPoint.logIfNeeded(ex); rpcEndPoint.logIfNeeded(ex);
} finally { } finally {
@ -150,6 +155,7 @@ public static class Builder {
private Configuration conf; private Configuration conf;
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
private OzoneContainer container; private OzoneContainer container;
private StateContext context;
/** /**
* Constructs the builder class. * Constructs the builder class.
@ -200,6 +206,10 @@ public Builder setOzoneContainer(OzoneContainer ozoneContainer) {
return this; return this;
} }
public Builder setContext(StateContext stateContext) {
this.context = stateContext;
return this;
}
public RegisterEndpointTask build() { public RegisterEndpointTask build() {
if (endPointStateMachine == null) { if (endPointStateMachine == null) {
@ -210,8 +220,9 @@ public RegisterEndpointTask build() {
if (conf == null) { if (conf == null) {
LOG.error("No config specified."); LOG.error("No config specified.");
throw new IllegalArgumentException("A valid configration is needed to" + throw new IllegalArgumentException(
" construct RegisterEndpoint task"); "A valid configuration is needed to construct RegisterEndpoint "
+ "task");
} }
if (datanodeDetails == null) { if (datanodeDetails == null) {
@ -223,13 +234,20 @@ public RegisterEndpointTask build() {
if (container == null) { if (container == null) {
LOG.error("Container is not specified"); LOG.error("Container is not specified");
throw new IllegalArgumentException("Container is not specified to " + throw new IllegalArgumentException("Container is not specified to " +
"constrict RegisterEndpoint task"); "construct RegisterEndpoint task");
}
if (context == null) {
LOG.error("StateContext is not specified");
throw new IllegalArgumentException("Container is not specified to " +
"construct RegisterEndpoint task");
} }
RegisterEndpointTask task = new RegisterEndpointTask(this RegisterEndpointTask task = new RegisterEndpointTask(this
.endPointStateMachine, this.conf, this.container); .endPointStateMachine, this.conf, this.container, this.context);
task.setDatanodeDetails(datanodeDetails); task.setDatanodeDetails(datanodeDetails);
return task; return task;
} }
} }
} }

View File

@ -309,7 +309,8 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
when(ozoneContainer.getContainerReport()).thenReturn( when(ozoneContainer.getContainerReport()).thenReturn(
TestUtils.getRandomContainerReports(10)); TestUtils.getRandomContainerReports(10));
RegisterEndpointTask endpointTask = RegisterEndpointTask endpointTask =
new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer); new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer,
mock(StateContext.class));
if (!clearDatanodeDetails) { if (!clearDatanodeDetails) {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
endpointTask.setDatanodeDetails(datanodeDetails); endpointTask.setDatanodeDetails(datanodeDetails);