diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index a16bfdc26b..a8fe4949ae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -25,8 +25,6 @@ .CloseContainerHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler - .ContainerReportHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -88,7 +86,6 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() - .addHandler(new ContainerReportHandler()) .addHandler(new CloseContainerHandler()) .addHandler(new DeleteBlocksCommandHandler( container.getContainerManager(), conf)) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java deleted file mode 100644 index fbea2901ec..0000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Container Report handler. - */ -public class ContainerReportHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(ContainerReportHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public ContainerReportHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Container Report."); - invocationCount++; - long startTime = Time.monotonicNow(); - try { - ContainerReportsRequestProto containerReport = - container.getContainerReport(); - - // TODO : We send this report to all SCMs.Check if it is enough only to - // send to the leader once we have RAFT enabled SCMs. - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - endPoint.getEndPoint().sendContainerReport(containerReport); - } - } catch (IOException ex) { - LOG.error("Unable to process the Container Report command.", ex); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public SCMCmdType getCommandType() { - return SCMCmdType.sendContainerReport; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 2f1db391ed..01b4c72428 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -35,7 +35,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,10 +132,6 @@ private void processResponse(SCMHeartbeatResponseProto response, .equalsIgnoreCase(datanodeDetails.getUuid()), "Unexpected datanode ID in the response."); switch (commandResponseProto.getCmdType()) { - case sendContainerReport: - this.context.addCommand(SendContainerCommand.getFromProtobuf( - commandResponseProto.getSendReport())); - break; case reregisterCommand: if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java deleted file mode 100644 index 84317526e7..0000000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SendContainerCommand.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.protocol.commands; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SendContainerReportProto; - -/** - * Allows a Datanode to send in the container report. - */ -public class SendContainerCommand extends SCMCommand { - /** - * Returns a NullCommand class from NullCommandResponse Proto. - * @param unused - unused - * @return NullCommand - */ - public static SendContainerCommand getFromProtobuf( - final SendContainerReportProto unused) { - return new SendContainerCommand(); - } - - /** - * returns a new builder. - * @return Builder - */ - public static SendContainerCommand.Builder newBuilder() { - return new SendContainerCommand.Builder(); - } - - /** - * Returns the type of this command. - * - * @return Type - */ - @Override - public SCMCmdType getType() { - return SCMCmdType.sendContainerReport; - } - - /** - * Gets the protobuf message of this object. - * - * @return A protobuf message. - */ - @Override - public byte[] getProtoBufMessage() { - return SendContainerReportProto.newBuilder().build().toByteArray(); - } - - /** - * A Builder class this is the standard pattern we are using for all commands. - */ - public static class Builder { - /** - * Return a null command. - * @return - NullCommand. - */ - public SendContainerCommand build() { - return new SendContainerCommand(); - } - } -} diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 91070b373a..20e6af8c36 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -186,10 +186,9 @@ Type of commands supported by SCM to datanode protocol. enum SCMCmdType { versionCommand = 2; registeredCommand = 3; - sendContainerReport = 4; - reregisterCommand = 5; - deleteBlocksCommand = 6; - closeContainerCommand = 7; + reregisterCommand = 4; + deleteBlocksCommand = 5; + closeContainerCommand = 6; } /* @@ -199,11 +198,10 @@ message SCMCommandResponseProto { required SCMCmdType cmdType = 2; // Type of the command optional SCMRegisteredCmdResponseProto registeredProto = 3; optional SCMVersionResponseProto versionProto = 4; - optional SendContainerReportProto sendReport = 5; - optional SCMReregisterCmdResponseProto reregisterProto = 6; - optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7; - required string datanodeUUID = 8; - optional SCMCloseContainerCmdResponseProto closeContainerProto = 9; + optional SCMReregisterCmdResponseProto reregisterProto = 5; + optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6; + required string datanodeUUID = 7; + optional SCMCloseContainerCmdResponseProto closeContainerProto = 8; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java index af878bf97f..c444e904d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java @@ -21,12 +21,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,19 +34,10 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; -import static com.google.common.util.concurrent.Uninterruptibles - .sleepUninterruptibly; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .INVALID; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; - /** * These are pools that are actively checking for replication status of the * containers. @@ -177,56 +166,10 @@ public void startReconciliation() { nodeProcessed = new AtomicInteger(0); containerProcessedCount = new AtomicInteger(0); nodeCount = new AtomicInteger(0); - /* - Ask each datanode to send us commands. - */ - SendContainerCommand cmd = SendContainerCommand.newBuilder().build(); - for (DatanodeDetails dd : datanodeDetailsList) { - NodeState currentState = getNodestate(dd); - if (currentState == HEALTHY || currentState == STALE) { - nodeCount.incrementAndGet(); - // Queue commands to all datanodes in this pool to send us container - // report. Since we ignore dead nodes, it is possible that we would have - // over replicated the container if the node comes back. - nodeManager.addDatanodeCommand(dd.getUuid(), cmd); - } - } this.status = ProgressStatus.InProgress; this.getPool().setLastProcessedTime(Time.monotonicNow()); } - /** - * Gets the node state. - * - * @param datanode - datanode information. - * @return NodeState. - */ - private NodeState getNodestate(DatanodeDetails datanode) { - NodeState currentState = INVALID; - int maxTry = 100; - // We need to loop to make sure that we will retry if we get - // node state unknown. This can lead to infinite loop if we send - // in unknown node ID. So max try count is used to prevent it. - - int currentTry = 0; - while (currentState == INVALID && currentTry < maxTry) { - // Retry to make sure that we deal with the case of node state not - // known. - currentState = nodeManager.getNodeState(datanode); - currentTry++; - if (currentState == INVALID) { - // Sleep to make sure that this is not a tight loop. - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } - } - if (currentState == INVALID) { - LOG.error("Not able to determine the state of Node: {}, Exceeded max " + - "try and node manager returns INVALID state. This indicates we " + - "are dealing with a node that we don't know about.", datanode); - } - return currentState; - } - /** * Queues a container Report for handling. This is done in a worker thread * since decoding a container report might be compute intensive . We don't diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 58b8c82840..6e5b7debe5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; @@ -46,7 +45,6 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.sendContainerReport; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; @@ -318,11 +316,6 @@ public void join() throws InterruptedException { .setCmdType(versionCommand) .setVersionProto(SCMVersionResponseProto.getDefaultInstance()) .build(); - case sendContainerReport: - return builder - .setCmdType(sendContainerReport) - .setSendReport(SendContainerReportProto.getDefaultInstance()) - .build(); case reregisterCommand: return builder .setCmdType(reregisterCommand)