HDDS-80. Remove SendContainerCommand from SCM. Contributed by Nanda Kumar.

This commit is contained in:
Xiaoyu Yao 2018-05-24 11:10:30 -07:00
parent c9b63deb53
commit 2d19e7d08f
7 changed files with 7 additions and 275 deletions

View File

@ -25,8 +25,6 @@
.CloseContainerHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ContainerReportHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@ -88,7 +86,6 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new ContainerReportHandler())
.addHandler(new CloseContainerHandler())
.addHandler(new DeleteBlocksCommandHandler(
container.getContainerManager(), conf))

View File

@ -1,114 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Container Report handler.
*/
public class ContainerReportHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);
private int invocationCount;
private long totalTime;
/**
* Constructs a ContainerReport handler.
*/
public ContainerReportHandler() {
}
/**
* Handles a given SCM command.
*
* @param command - SCM Command
* @param container - Ozone Container.
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Container Report.");
invocationCount++;
long startTime = Time.monotonicNow();
try {
ContainerReportsRequestProto containerReport =
container.getContainerReport();
// TODO : We send this report to all SCMs.Check if it is enough only to
// send to the leader once we have RAFT enabled SCMs.
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
endPoint.getEndPoint().sendContainerReport(containerReport);
}
} catch (IOException ex) {
LOG.error("Unable to process the Container Report command.", ex);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
}
/**
* Returns the command type that this command handler handles.
*
* @return Type
*/
@Override
public SCMCmdType getCommandType() {
return SCMCmdType.sendContainerReport;
}
/**
* Returns number of times this handler has been invoked.
*
* @return int
*/
@Override
public int getInvocationCount() {
return invocationCount;
}
/**
* Returns the average time this function takes to run.
*
* @return long
*/
@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
}
return 0;
}
}

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -133,10 +132,6 @@ private void processResponse(SCMHeartbeatResponseProto response,
.equalsIgnoreCase(datanodeDetails.getUuid()),
"Unexpected datanode ID in the response.");
switch (commandResponseProto.getCmdType()) {
case sendContainerReport:
this.context.addCommand(SendContainerCommand.getFromProtobuf(
commandResponseProto.getSendReport()));
break;
case reregisterCommand:
if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
if (LOG.isDebugEnabled()) {

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCmdType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
/**
* Allows a Datanode to send in the container report.
*/
public class SendContainerCommand extends SCMCommand<SendContainerReportProto> {
/**
* Returns a NullCommand class from NullCommandResponse Proto.
* @param unused - unused
* @return NullCommand
*/
public static SendContainerCommand getFromProtobuf(
final SendContainerReportProto unused) {
return new SendContainerCommand();
}
/**
* returns a new builder.
* @return Builder
*/
public static SendContainerCommand.Builder newBuilder() {
return new SendContainerCommand.Builder();
}
/**
* Returns the type of this command.
*
* @return Type
*/
@Override
public SCMCmdType getType() {
return SCMCmdType.sendContainerReport;
}
/**
* Gets the protobuf message of this object.
*
* @return A protobuf message.
*/
@Override
public byte[] getProtoBufMessage() {
return SendContainerReportProto.newBuilder().build().toByteArray();
}
/**
* A Builder class this is the standard pattern we are using for all commands.
*/
public static class Builder {
/**
* Return a null command.
* @return - NullCommand.
*/
public SendContainerCommand build() {
return new SendContainerCommand();
}
}
}

View File

@ -186,10 +186,9 @@ Type of commands supported by SCM to datanode protocol.
enum SCMCmdType {
versionCommand = 2;
registeredCommand = 3;
sendContainerReport = 4;
reregisterCommand = 5;
deleteBlocksCommand = 6;
closeContainerCommand = 7;
reregisterCommand = 4;
deleteBlocksCommand = 5;
closeContainerCommand = 6;
}
/*
@ -199,11 +198,10 @@ message SCMCommandResponseProto {
required SCMCmdType cmdType = 2; // Type of the command
optional SCMRegisteredCmdResponseProto registeredProto = 3;
optional SCMVersionResponseProto versionProto = 4;
optional SendContainerReportProto sendReport = 5;
optional SCMReregisterCmdResponseProto reregisterProto = 6;
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
required string datanodeUUID = 8;
optional SCMCloseContainerCmdResponseProto closeContainerProto = 9;
optional SCMReregisterCmdResponseProto reregisterProto = 5;
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6;
required string datanodeUUID = 7;
optional SCMCloseContainerCmdResponseProto closeContainerProto = 8;
}

View File

@ -21,12 +21,10 @@
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodePoolManager;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,19 +34,10 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.util.concurrent.Uninterruptibles
.sleepUninterruptibly;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.INVALID;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
/**
* These are pools that are actively checking for replication status of the
* containers.
@ -177,56 +166,10 @@ public void startReconciliation() {
nodeProcessed = new AtomicInteger(0);
containerProcessedCount = new AtomicInteger(0);
nodeCount = new AtomicInteger(0);
/*
Ask each datanode to send us commands.
*/
SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
for (DatanodeDetails dd : datanodeDetailsList) {
NodeState currentState = getNodestate(dd);
if (currentState == HEALTHY || currentState == STALE) {
nodeCount.incrementAndGet();
// Queue commands to all datanodes in this pool to send us container
// report. Since we ignore dead nodes, it is possible that we would have
// over replicated the container if the node comes back.
nodeManager.addDatanodeCommand(dd.getUuid(), cmd);
}
}
this.status = ProgressStatus.InProgress;
this.getPool().setLastProcessedTime(Time.monotonicNow());
}
/**
* Gets the node state.
*
* @param datanode - datanode information.
* @return NodeState.
*/
private NodeState getNodestate(DatanodeDetails datanode) {
NodeState currentState = INVALID;
int maxTry = 100;
// We need to loop to make sure that we will retry if we get
// node state unknown. This can lead to infinite loop if we send
// in unknown node ID. So max try count is used to prevent it.
int currentTry = 0;
while (currentState == INVALID && currentTry < maxTry) {
// Retry to make sure that we deal with the case of node state not
// known.
currentState = nodeManager.getNodeState(datanode);
currentTry++;
if (currentState == INVALID) {
// Sleep to make sure that this is not a tight loop.
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
if (currentState == INVALID) {
LOG.error("Not able to determine the state of Node: {}, Exceeded max " +
"try and node manager returns INVALID state. This indicates we " +
"are dealing with a node that we don't know about.", datanode);
}
return currentState;
}
/**
* Queues a container Report for handling. This is done in a worker thread
* since decoding a container report might be compute intensive . We don't

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
@ -46,7 +45,6 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.versionCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.registeredCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.sendContainerReport;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.deleteBlocksCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand;
@ -318,11 +316,6 @@ public void join() throws InterruptedException {
.setCmdType(versionCommand)
.setVersionProto(SCMVersionResponseProto.getDefaultInstance())
.build();
case sendContainerReport:
return builder
.setCmdType(sendContainerReport)
.setSendReport(SendContainerReportProto.getDefaultInstance())
.build();
case reregisterCommand:
return builder
.setCmdType(reregisterCommand)