diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 777efa70f8..c36ca1f934 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -171,6 +171,10 @@ public void resetPipeline() { public Map getDatanodes() { return datanodes; } + + public boolean isEmpty() { + return datanodes.isEmpty(); + } /** * Returns the leader host. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java index 4cf6321e4c..47368575c2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java @@ -82,6 +82,6 @@ protected CommandStatusReportsProto getReport() { map.remove(key); } }); - return builder.build(); + return builder.getCmdStatusCount() > 0 ? builder.build() : null; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 47c249200d..12c196b1bd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto @@ -35,8 +37,11 @@ import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.CommandStatus .CommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands + .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import static java.lang.Math.min; import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +57,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -67,7 +73,7 @@ public class StateContext { private final DatanodeStateMachine parent; private final AtomicLong stateExecutionCount; private final Configuration conf; - private final Queue reports; + private final List reports; private final Queue containerActions; private final Queue pipelineActions; private DatanodeStateMachine.DatanodeStates state; @@ -174,19 +180,23 @@ public void setState(DatanodeStateMachine.DatanodeStates state) { * @param report report to be added */ public void addReport(GeneratedMessage report) { - synchronized (reports) { - reports.add(report); + if (report != null) { + synchronized (reports) { + reports.add(report); + } } } /** - * Returns the next report, or null if the report queue is empty. + * Adds the reports which could not be sent by heartbeat back to the + * reports list. * - * @return report + * @param reportsToPutBack list of reports which failed to be sent by + * heartbeat. */ - public GeneratedMessage getNextReport() { + public void putBackReports(List reportsToPutBack) { synchronized (reports) { - return reports.poll(); + reports.addAll(0, reportsToPutBack); } } @@ -207,19 +217,14 @@ public List getAllAvailableReports() { * @return List */ public List getReports(int maxLimit) { - List reportList = new ArrayList<>(); + List reportsToReturn = new LinkedList<>(); synchronized (reports) { - if (!reports.isEmpty()) { - int size = reports.size(); - int limit = size > maxLimit ? maxLimit : size; - for (int count = 0; count < limit; count++) { - GeneratedMessage report = reports.poll(); - Preconditions.checkNotNull(report); - reportList.add(report); - } - } - return reportList; + List tempList = reports.subList( + 0, min(reports.size(), maxLimit)); + reportsToReturn.addAll(tempList); + tempList.clear(); } + return reportsToReturn; } @@ -442,9 +447,14 @@ public void addCmdStatus(Long key, CommandStatus status) { * @param cmd - {@link SCMCommand}. */ public void addCmdStatus(SCMCommand cmd) { + CommandStatusBuilder statusBuilder; + if (cmd.getType() == Type.deleteBlocksCommand) { + statusBuilder = new DeleteBlockCommandStatusBuilder(); + } else { + statusBuilder = CommandStatusBuilder.newBuilder(); + } this.addCmdStatus(cmd.getId(), - CommandStatusBuilder.newBuilder() - .setCmdId(cmd.getId()) + statusBuilder.setCmdId(cmd.getId()) .setStatus(Status.PENDING) .setType(cmd.getType()) .build()); @@ -469,13 +479,13 @@ public void removeCommandStatus(Long cmdId) { /** * Updates status of a pending status command. * @param cmdId command id - * @param cmdExecuted SCMCommand + * @param cmdStatusUpdater Consumer to update command status. * @return true if command status updated successfully else false. */ - public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) { + public boolean updateCommandStatus(Long cmdId, + Consumer cmdStatusUpdater) { if(cmdStatusMap.containsKey(cmdId)) { - cmdStatusMap.get(cmdId) - .setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); + cmdStatusUpdater.accept(cmdStatusMap.get(cmdId)); return true; } return false; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 0af591beb1..2c3db6195e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -117,7 +117,8 @@ public void handle(SCMCommand command, OzoneContainer container, cmdExecuted = false; } } finally { - updateCommandStatus(context, command, cmdExecuted, LOG); + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 71c25b5a5b..1ea0ea8451 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -23,9 +23,12 @@ .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; +import java.util.function.Consumer; + /** * Generic interface for handlers. */ @@ -63,8 +66,8 @@ void handle(SCMCommand command, OzoneContainer container, * Default implementation for updating command status. */ default void updateCommandStatus(StateContext context, SCMCommand command, - boolean cmdExecuted, Logger log) { - if (!context.updateCommandStatus(command.getId(), cmdExecuted)) { + Consumer cmdStatusUpdater, Logger log) { + if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) { log.debug("{} with Id:{} not found.", command.getType(), command.getId()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 430b0ef31c..aa63fb48f4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -38,12 +38,12 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.List; +import java.util.function.Consumer; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_NOT_FOUND; @@ -63,7 +64,7 @@ */ public class DeleteBlocksCommandHandler implements CommandHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); private final ContainerSet containerSet; @@ -83,6 +84,7 @@ public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { cmdExecuted = false; long startTime = Time.monotonicNow(); + ContainerBlocksDeletionACKProto blockDeletionACK = null; try { if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { LOG.warn("Skipping handling command, expected command " @@ -144,31 +146,28 @@ public void handle(SCMCommand command, OzoneContainer container, .setDnId(context.getParent().getDatanodeDetails() .getUuid().toString()); }); - ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); + blockDeletionACK = resultBuilder.build(); // Send ACK back to SCM as long as meta updated // TODO Or we should wait until the blocks are actually deleted? if (!containerBlocks.isEmpty()) { - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending following block deletion ACK to SCM"); - for (DeleteBlockTransactionResult result : - blockDeletionACK.getResultsList()) { - LOG.debug(result.getTxID() + " : " + result.getSuccess()); - } - } - endPoint.getEndPoint() - .sendContainerBlocksDeletionACK(blockDeletionACK); - } catch (IOException e) { - LOG.error("Unable to send block deletion ACK to SCM {}", - endPoint.getAddress().toString(), e); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending following block deletion ACK to SCM"); + for (DeleteBlockTransactionResult result : blockDeletionACK + .getResultsList()) { + LOG.debug(result.getTxID() + " : " + result.getSuccess()); } } } cmdExecuted = true; } finally { - updateCommandStatus(context, command, cmdExecuted, LOG); + final ContainerBlocksDeletionACKProto deleteAck = + blockDeletionACK; + Consumer statusUpdater = (cmdStatus) -> { + cmdStatus.setStatus(cmdExecuted); + ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck); + }; + updateCommandStatus(context, command, statusUpdater, LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } @@ -238,9 +237,9 @@ private void deleteKeyValueContainerBlocks( } } - containerDB.put(DFSUtil.string2Bytes( - OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()), - Longs.toByteArray(delTX.getTxID())); + containerDB + .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX), + Longs.toByteArray(delTX.getTxID())); containerData .updateDeleteTransactionId(delTX.getTxID()); // update pending deletion blocks count in in-memory container status diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 09c379fd10..81d162d280 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -77,7 +77,8 @@ public void handle(SCMCommand command, OzoneContainer container, supervisor.addTask(replicationTask); } finally { - updateCommandStatus(context, command, true, LOG); + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(true), LOG); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 5769e6d2f6..4fd72ec208 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.time.ZonedDateTime; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -124,12 +125,12 @@ public void setDatanodeDetailsProto(DatanodeDetailsProto @Override public EndpointStateMachine.EndPointStates call() throws Exception { rpcEndpoint.lock(); + SCMHeartbeatRequestProto.Builder requestBuilder = null; try { Preconditions.checkState(this.datanodeDetailsProto != null); - SCMHeartbeatRequestProto.Builder requestBuilder = - SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetailsProto); + requestBuilder = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); addContainerActions(requestBuilder); addPipelineActions(requestBuilder); @@ -139,6 +140,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception { rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { + // put back the reports which failed to be sent + putBackReports(requestBuilder); rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -146,6 +149,24 @@ public EndpointStateMachine.EndPointStates call() throws Exception { return rpcEndpoint.getState(); } + // TODO: Make it generic. + private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { + List reports = new LinkedList<>(); + if (requestBuilder.hasContainerReport()) { + reports.add(requestBuilder.getContainerReport()); + } + if (requestBuilder.hasNodeReport()) { + reports.add(requestBuilder.getNodeReport()); + } + if (requestBuilder.getCommandStatusReportsCount() != 0) { + for (GeneratedMessage msg : requestBuilder + .getCommandStatusReportsList()) { + reports.add(msg); + } + } + context.putBackReports(reports); + } + /** * Adds all the available reports to heartbeat. * @@ -158,7 +179,11 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { SCMHeartbeatRequestProto.getDescriptor().getFields()) { String heartbeatFieldName = descriptor.getMessageType().getFullName(); if (heartbeatFieldName.equals(reportName)) { - requestBuilder.setField(descriptor, report); + if (descriptor.isRepeated()) { + requestBuilder.addRepeatedField(descriptor, report); + } else { + requestBuilder.setField(descriptor, report); + } } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 7c986f04b1..c3a41262ce 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -19,10 +19,13 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Storage; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -32,7 +35,10 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +177,20 @@ public void verifyContainerData(ContainerData containerData) KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); KeyValueContainer kvContainer = new KeyValueContainer( kvContainerData, config); + MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config); + MetadataKeyFilters.KeyPrefixFilter filter = + new MetadataKeyFilters.KeyPrefixFilter() + .addFilter(OzoneConsts.DELETING_KEY_PREFIX); + int numPendingDeletionBlocks = + containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter) + .size(); + kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks); + byte[] delTxnId = containerDB.get( + DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX)); + if (delTxnId != null) { + kvContainerData + .updateDeleteTransactionId(Longs.fromByteArray(delTxnId)); + } containerSet.addContainer(kvContainer); } else { throw new StorageContainerException("Container File is corrupted. " + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 9296524fca..e3b5370d91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -22,11 +22,6 @@ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -77,12 +72,4 @@ SCMRegisteredResponseProto register( ContainerReportsProto containerReportsRequestProto, PipelineReportsProto pipelineReports) throws IOException; - /** - * Used by datanode to send block deletion ACK to SCM. - * @param request block deletion transactions. - * @return block deletion transaction response. - * @throws IOException - */ - ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto request) throws IOException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java index 0c4964ac4c..69337fb7c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java @@ -20,11 +20,13 @@ import java.util.UUID; import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; /** * Command for the datanode with the destination address. */ -public class CommandForDatanode { +public class CommandForDatanode implements + IdentifiableEventPayload { private final UUID datanodeId; @@ -42,4 +44,8 @@ public UUID getDatanodeId() { public SCMCommand getCommand() { return command; } + + public long getId() { + return command.getId(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java index 32cf7c222f..4b3ce840dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java @@ -35,6 +35,13 @@ public class CommandStatus { private Status status; private String msg; + CommandStatus(Type type, Long cmdId, Status status, String msg) { + this.type = type; + this.cmdId = cmdId; + this.status = status; + this.msg = msg; + } + public Type getType() { return type; } @@ -60,6 +67,10 @@ public void setStatus(Status status) { this.status = status; } + public void setStatus(boolean cmdExecuted) { + setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); + } + /** * Returns a CommandStatus from the protocol buffers. * @@ -72,7 +83,8 @@ public CommandStatus getFromProtoBuf( .setCmdId(cmdStatusProto.getCmdId()) .setStatus(cmdStatusProto.getStatus()) .setType(cmdStatusProto.getType()) - .setMsg(cmdStatusProto.getMsg()).build(); + .setMsg(cmdStatusProto.getMsg()) + .build(); } /** * Returns a CommandStatus from the protocol buffers. @@ -95,20 +107,36 @@ public CommandStatus getFromProtoBuf( /** * Builder class for CommandStatus. */ - public static final class CommandStatusBuilder { + public static class CommandStatusBuilder { private SCMCommandProto.Type type; private Long cmdId; private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status; private String msg; - private CommandStatusBuilder() { + CommandStatusBuilder() { } public static CommandStatusBuilder newBuilder() { return new CommandStatusBuilder(); } + public Type getType() { + return type; + } + + public Long getCmdId() { + return cmdId; + } + + public Status getStatus() { + return status; + } + + public String getMsg() { + return msg; + } + public CommandStatusBuilder setType(Type commandType) { this.type = commandType; return this; @@ -130,12 +158,7 @@ public CommandStatusBuilder setMsg(String message) { } public CommandStatus build() { - CommandStatus commandStatus = new CommandStatus(); - commandStatus.type = this.type; - commandStatus.msg = this.msg; - commandStatus.status = this.status; - commandStatus.cmdId = this.cmdId; - return commandStatus; + return new CommandStatus(type, cmdId, status, msg); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java new file mode 100644 index 0000000000..2659ab3704 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; + +public class DeleteBlockCommandStatus extends CommandStatus { + + private ContainerBlocksDeletionACKProto blocksDeletionAck = null; + + public DeleteBlockCommandStatus(Type type, Long cmdId, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status status, + String msg, ContainerBlocksDeletionACKProto blocksDeletionAck) { + super(type, cmdId, status, msg); + this.blocksDeletionAck = blocksDeletionAck; + } + + public void setBlocksDeletionAck( + ContainerBlocksDeletionACKProto deletionAck) { + blocksDeletionAck = deletionAck; + } + + @Override + public CommandStatus getFromProtoBuf( + StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { + return DeleteBlockCommandStatusBuilder.newBuilder() + .setBlockDeletionAck(cmdStatusProto.getBlockDeletionAck()) + .setCmdId(cmdStatusProto.getCmdId()) + .setStatus(cmdStatusProto.getStatus()) + .setType(cmdStatusProto.getType()) + .setMsg(cmdStatusProto.getMsg()) + .build(); + } + + @Override + public StorageContainerDatanodeProtocolProtos.CommandStatus getProtoBufMessage() { + StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = + StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() + .setCmdId(this.getCmdId()) + .setStatus(this.getStatus()) + .setType(this.getType()); + if (blocksDeletionAck != null) { + builder.setBlockDeletionAck(blocksDeletionAck); + } + if (this.getMsg() != null) { + builder.setMsg(this.getMsg()); + } + return builder.build(); + } + + public static final class DeleteBlockCommandStatusBuilder + extends CommandStatusBuilder { + private ContainerBlocksDeletionACKProto blocksDeletionAck = null; + + public static DeleteBlockCommandStatusBuilder newBuilder() { + return new DeleteBlockCommandStatusBuilder(); + } + + public DeleteBlockCommandStatusBuilder setBlockDeletionAck( + ContainerBlocksDeletionACKProto deletionAck) { + this.blocksDeletionAck = deletionAck; + return this; + } + + @Override + public CommandStatus build() { + return new DeleteBlockCommandStatus(getType(), getCmdId(), getStatus(), + getMsg(), blocksDeletionAck); + } + + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index b9cf6f9fc9..4e1e27e180 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -23,11 +23,6 @@ .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; @@ -169,16 +164,4 @@ public SCMRegisteredResponseProto register( return response; } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { - final ContainerBlocksDeletionACKResponseProto resp; - try { - resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER, - deletedBlocks); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return resp; - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index ed0182263a..862233276a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -32,11 +32,6 @@ .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -96,15 +91,4 @@ public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller, } } - - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - RpcController controller, ContainerBlocksDeletionACKProto request) - throws ServiceException { - try { - return impl.sendContainerBlocksDeletionACK(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 78758cbe46..982029c91b 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional CommandStatusReportsProto commandStatusReport = 4; + repeated CommandStatusReportsProto commandStatusReports = 4; optional ContainerActionsProto containerActions = 5; optional PipelineActionsProto pipelineActions = 6; optional PipelineReportsProto pipelineReports = 7; @@ -145,6 +145,7 @@ message CommandStatus { required Status status = 2 [default = PENDING]; required SCMCommandProto.Type type = 3; optional string msg = 4; + optional ContainerBlocksDeletionACKProto blockDeletionAck = 5; } message ContainerActionsProto { @@ -272,10 +273,6 @@ message ContainerBlocksDeletionACKProto { required string dnId = 2; } -// SendACK response returned by datanode to SCM, currently empty. -message ContainerBlocksDeletionACKResponseProto { -} - /** This command asks the datanode to close a specific container. */ @@ -386,8 +383,4 @@ service StorageContainerDatanodeProtocolService { */ rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); - /** - * Sends the block deletion ACK to SCM. - */ - rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 27b6272ef6..3e45596f37 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -22,6 +22,8 @@ .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.VersionInfo; @@ -29,11 +31,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto @@ -196,10 +193,12 @@ private void sleepIfNeeded() { sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); - if(heartbeat.hasCommandStatusReport()){ - cmdStatusList.addAll(heartbeat.getCommandStatusReport() - .getCmdStatusList()); - commandStatusReport.incrementAndGet(); + if (heartbeat.getCommandStatusReportsCount() != 0) { + for (CommandStatusReportsProto statusReport : heartbeat + .getCommandStatusReportsList()) { + cmdStatusList.addAll(statusReport.getCmdStatusList()); + commandStatusReport.incrementAndGet(); + } } sleepIfNeeded(); return SCMHeartbeatResponseProto.newBuilder().addAllCommands( @@ -305,13 +304,6 @@ public int getContainerCountsForDatanode(DatanodeDetails datanodeDetails) { return 0; } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto request) throws IOException { - return ContainerBlocksDeletionACKResponseProto - .newBuilder().getDefaultInstanceForType(); - } - /** * Reset the mock Scm for test to get a fresh start without rebuild MockScm. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 811599f01a..1e82326ff4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -138,9 +138,7 @@ public void testCommandStatusPublisher() throws InterruptedException { new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Unit test ReportManager Thread - %d").build()); publisher.init(dummyContext, executorService); - Assert.assertEquals(0, - ((CommandStatusReportPublisher) publisher).getReport() - .getCmdStatusCount()); + Assert.assertNull(((CommandStatusReportPublisher) publisher).getReport()); // Insert to status object to state context map and then get the report. CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder() diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 69a6a339bf..606940b510 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -77,7 +77,7 @@ public void testheartbeatWithoutReports() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -108,7 +108,7 @@ public void testheartbeatWithNodeReports() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertTrue(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -139,7 +139,7 @@ public void testheartbeatWithContainerReports() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -170,7 +170,7 @@ public void testheartbeatWithCommandStatusReports() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -201,7 +201,7 @@ public void testheartbeatWithContainerActions() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertTrue(heartbeat.hasContainerActions()); } @@ -235,7 +235,7 @@ public void testheartbeatWithAllReports() throws Exception { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertTrue(heartbeat.hasNodeReport()); Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0); Assert.assertTrue(heartbeat.hasContainerActions()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 25420fe927..8702a42d26 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -61,7 +61,8 @@ public boolean addTransaction(DeletedBlocksTransaction tx, try { ContainerWithPipeline containerWithPipeline = mappingService.getContainerWithPipeline(tx.getContainerID()); - if (containerWithPipeline.getContainerInfo().isContainerOpen()) { + if (containerWithPipeline.getContainerInfo().isContainerOpen() + || containerWithPipeline.getPipeline().isEmpty()) { return false; } pipeline = containerWithPipeline.getPipeline(); @@ -70,25 +71,19 @@ public boolean addTransaction(DeletedBlocksTransaction tx, return false; } - if (pipeline == null) { - SCMBlockDeletingService.LOG.warn( - "Container {} not found, continue to process next", - tx.getContainerID()); - return false; - } - + boolean success = false; for (DatanodeDetails dd : pipeline.getMachines()) { UUID dnID = dd.getUuid(); if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted.contains(dnID)) { // Transaction need not be sent to dns which have already committed it - addTransactionToDN(dnID, tx); + success = addTransactionToDN(dnID, tx); } } - return true; + return success; } - private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { + private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { if (transactions.containsKey(dnID)) { List txs = transactions.get(dnID); if (txs != null && txs.size() < maximumAllowedTXNum) { @@ -103,14 +98,17 @@ private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { if (!hasContained) { txs.add(tx); currentTXNum++; + return true; } } } else { currentTXNum++; transactions.put(dnID, tx); + return true; } SCMBlockDeletingService.LOG .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID()); + return false; } Set getDatanodeIDs() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 49af65c9eb..68435d1c6a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -22,11 +22,17 @@ import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.scm.command + .CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -75,9 +81,10 @@ * equally same chance to be retrieved which only depends on the nature * order of the transaction ID. */ -public class DeletedBlockLogImpl implements DeletedBlockLog { +public class DeletedBlockLogImpl + implements DeletedBlockLog, EventHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DeletedBlockLogImpl.class); private static final byte[] LATEST_TXID = @@ -123,7 +130,7 @@ public DeletedBlockLogImpl(Configuration conf, Mapping containerManager) } @VisibleForTesting - MetadataStore getDeletedStore() { + public MetadataStore getDeletedStore() { return deletedStore; } @@ -269,6 +276,8 @@ public void commitTransactions( deletedStore.delete(Longs.toByteArray(txID)); } } + LOG.debug("Datanode txId={} containerId={} committed by dnId={}", + txID, containerId, dnID); } catch (IOException e) { LOG.warn("Could not commit delete block transaction: " + transactionResult.getTxID(), e); @@ -407,4 +416,13 @@ public Map getTransactions( lock.unlock(); } } + + @Override + public void onMessage(DeleteBlockStatus deleteBlockStatus, + EventPublisher publisher) { + ContainerBlocksDeletionACKProto ackProto = + deleteBlockStatus.getCmdStatus().getBlockDeletionAck(); + commitTransactions(ackProto.getResultsList(), + UUID.fromString(ackProto.getDnId())); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index de3fe26682..b85d77f084 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -165,7 +165,7 @@ public EmptyTaskResult call() throws Exception { // We should stop caching new commands if num of un-processed // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. - eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs))); LOG.debug( "Added delete block command for datanode {} in the queue," diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 054665a1b5..c0de3820bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -63,8 +63,10 @@ public void onMessage(CommandStatusReportFromDatanode report, CloseContainerStatus(cmdStatus)); break; case deleteBlocksCommand: - publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new - DeleteBlockCommandStatus(cmdStatus)); + if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { + publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, + new DeleteBlockStatus(cmdStatus)); + } break; default: LOGGER.debug("CommandStatus of type:{} not handled in " + @@ -120,8 +122,8 @@ public CloseContainerStatus(CommandStatus cmdStatus) { /** * Wrapper event for DeleteBlock Command. */ - public static class DeleteBlockCommandStatus extends CommandStatusEvent { - public DeleteBlockCommandStatus(CommandStatus cmdStatus) { + public static class DeleteBlockStatus extends CommandStatusEvent { + public DeleteBlockStatus(CommandStatus cmdStatus) { super(cmdStatus); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index eb0a0b44e2..71e17e9982 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -176,7 +175,10 @@ public ContainerInfo getContainer(final long containerID) throws } /** - * Returns the ContainerInfo from the container ID. + * Returns the ContainerInfo and pipeline from the containerID. If container + * has no available replicas in datanodes it returns pipeline with no + * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for + * an empty pipeline. * * @param containerID - ID of container. * @return - ContainerWithPipeline such as creation state and the pipeline. @@ -200,6 +202,7 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID) contInfo = ContainerInfo.fromProtobuf(temp); Pipeline pipeline; + String leaderId = ""; if (contInfo.isContainerOpen()) { // If pipeline with given pipeline Id already exist return it pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); @@ -207,14 +210,12 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID) // For close containers create pipeline from datanodes with replicas Set dnWithReplicas = containerStateManager .getContainerReplicas(contInfo.containerID()); - if (dnWithReplicas.size() == 0) { - throw new SCMException("Can't create a pipeline for container with " - + "no replica.", ResultCodes.NO_REPLICA_FOUND); + if (!dnWithReplicas.isEmpty()) { + leaderId = dnWithReplicas.iterator().next().getUuidString(); } - pipeline = - new Pipeline(dnWithReplicas.iterator().next().getUuidString(), - contInfo.getState(), ReplicationType.STAND_ALONE, - contInfo.getReplicationFactor(), PipelineID.randomId()); + pipeline = new Pipeline(leaderId, contInfo.getState(), + ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), + PipelineID.randomId()); dnWithReplicas.forEach(pipeline::addMember); } return new ContainerWithPipeline(contInfo, pipeline); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 745e052be3..77b87132a4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -23,8 +23,7 @@ import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .CloseContainerStatus; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .DeleteBlockCommandStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler @@ -128,6 +127,10 @@ public final class SCMEvents { public static final Event DATANODE_COMMAND = new TypedEvent<>(CommandForDatanode.class, "Datanode_Command"); + public static final TypedEvent + RETRIABLE_DATANODE_COMMAND = + new TypedEvent<>(CommandForDatanode.class, "Retriable_Datanode_Command"); + /** * A Close Container Event can be triggered under many condition. Some of them * are: 1. A Container is full, then we stop writing further information to @@ -179,7 +182,7 @@ public final class SCMEvents { * status for Replication SCMCommand is received. */ public static final Event REPLICATION_STATUS = new - TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus"); + TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status"); /** * This event will be triggered by CommandStatusReportHandler whenever a * status for CloseContainer SCMCommand is received. @@ -187,15 +190,15 @@ public final class SCMEvents { public static final Event CLOSE_CONTAINER_STATUS = new TypedEvent<>(CloseContainerStatus.class, - "CloseContainerCommandStatus"); + "Close_Container_Command_Status"); /** * This event will be triggered by CommandStatusReportHandler whenever a * status for DeleteBlock SCMCommand is received. */ - public static final Event + public static final TypedEvent DELETE_BLOCK_STATUS = - new TypedEvent<>(DeleteBlockCommandStatus.class, - "DeleteBlockCommandStatus"); + new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, + "Delete_Block_Status"); /** * This event will be triggered while processing container reports from DN @@ -203,7 +206,7 @@ public final class SCMEvents { * deleteTransactionID on SCM. */ public static final Event PENDING_DELETE_STATUS = - new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus"); + new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status"); /** * This is the command for ReplicationManager to handle under/over diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index e65de8ba35..d9a0875385 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -121,10 +121,13 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { heartbeat.getPipelineActions())); } - if (heartbeat.hasCommandStatusReport()) { - eventPublisher.fireEvent(CMD_STATUS_REPORT, - new CommandStatusReportFromDatanode(datanodeDetails, - heartbeat.getCommandStatusReport())); + if (heartbeat.getCommandStatusReportsCount() != 0) { + for (CommandStatusReportsProto commandStatusReport : heartbeat + .getCommandStatusReportsList()) { + eventPublisher.fireEvent(CMD_STATUS_REPORT, + new CommandStatusReportFromDatanode(datanodeDetails, + commandStatusReport)); + } } return commands; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 4a0d3e5f70..9c6fa88e0c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -49,15 +49,6 @@ .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; - import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto @@ -100,7 +91,6 @@ import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; @@ -242,18 +232,6 @@ public SCMHeartbeatResponseProto sendHeartbeat( .addAllCommands(cmdResponses).build(); } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto acks) throws IOException { - if (acks.getResultsCount() > 0) { - List resultList = acks.getResultsList(); - scm.getScmBlockManager().getDeletedBlockLog() - .commitTransactions(resultList, UUID.fromString(acks.getDnId())); - } - return ContainerBlocksDeletionACKResponseProto.newBuilder() - .getDefaultInstanceForType(); - } - /** * Returns a SCMCommandRepose from the SCM Command. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 52af62dd0e..bb72075305 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; @@ -68,6 +69,7 @@ import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -253,6 +255,13 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", watcherTimeout); + RetriableDatanodeEventWatcher retriableDatanodeEventWatcher = + new RetriableDatanodeEventWatcher<>( + SCMEvents.RETRIABLE_DATANODE_COMMAND, + SCMEvents.DELETE_BLOCK_STATUS, + commandWatcherLeaseManager); + retriableDatanodeEventWatcher.start(eventQueue); + //TODO: support configurable containerPlacement policy ContainerPlacementPolicy containerPlacementPolicy = new SCMContainerPlacementCapacity(scmNodeManager, conf); @@ -282,6 +291,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { httpServer = new StorageContainerManagerHttpServer(conf); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); + eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); @@ -296,6 +306,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { replicationStatus.getChillModeStatusListener()); eventQueue .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); + eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, + (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionEventHandler); eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java new file mode 100644 index 0000000000..2a50bca9dc --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.CommandStatusEvent; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EventWatcher for start events and completion events with payload of type + * RetriablePayload and RetriableCompletionPayload respectively. + */ +public class RetriableDatanodeEventWatcher + extends EventWatcher { + + public static final Logger LOG = + LoggerFactory.getLogger(RetriableDatanodeEventWatcher.class); + + public RetriableDatanodeEventWatcher(Event startEvent, + Event completionEvent, LeaseManager leaseManager) { + super(startEvent, completionEvent, leaseManager); + } + + @Override + protected void onTimeout(EventPublisher publisher, + CommandForDatanode payload) { + LOG.info("RetriableDatanodeCommand type={} with id={} timed out. Retrying.", + payload.getCommand().getType(), payload.getId()); + //put back to the original queue + publisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, payload); + } + + @Override + protected void onFinished(EventPublisher publisher, + CommandForDatanode payload) { + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java new file mode 100644 index 0000000000..b1d28386a4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java index 65a2e29631..afa25e2af7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java @@ -67,20 +67,20 @@ public void testCommandStatusReport() { CommandStatusReportFromDatanode report = this.getStatusReport(Collections .emptyList()); cmdStatusReportHandler.onMessage(report, this); - assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus")); + assertFalse(logCapturer.getOutput().contains("Delete_Block_Status")); assertFalse(logCapturer.getOutput().contains( - "CloseContainerCommandStatus")); - assertFalse(logCapturer.getOutput().contains("ReplicateCommandStatus")); + "Close_Container_Command_Status")); + assertFalse(logCapturer.getOutput().contains("Replicate_Command_Status")); report = this.getStatusReport(this.getCommandStatusList()); cmdStatusReportHandler.onMessage(report, this); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "DeleteBlockCommandStatus")); + "Delete_Block_Status")); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "CloseContainerCommandStatus")); + "Close_Container_Command_Status")); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "ReplicateCommandStatus")); + "Replicate_Command_Status")); assertTrue(logCapturer.getOutput().contains("type: " + "closeContainerCommand")); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java index 6a0b909179..f3cd4eaaba 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java @@ -127,7 +127,7 @@ public > void fireEvent( SCMHeartbeatRequestProto.newBuilder() .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) .setContainerReport(containerReport) - .setCommandStatusReport(commandStatusReport) + .addCommandStatusReports(commandStatusReport) .build(); dispatcher.dispatch(heartbeat); Assert.assertEquals(2, eventReceived.get()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index fe53bcc1a2..ac3ad5d3fa 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.junit.Assert.fail; @@ -188,6 +189,8 @@ public void testBlockDeletionTransactions() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, + TimeUnit.MILLISECONDS); conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 3000, TimeUnit.MILLISECONDS); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index a129ed09cb..5b9bcbd4f2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -27,6 +27,7 @@ .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdfs.DFSUtil; @@ -46,13 +47,14 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ozShell.TestOzoneShell; +import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.utils.MetadataStore; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.slf4j.event.Level; import java.io.File; import java.io.IOException; @@ -64,8 +66,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.lang.Math.max; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds .HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; @@ -76,14 +83,16 @@ public class TestBlockDeletion { private static OzoneConfiguration conf = null; private static ObjectStore store; private static MiniOzoneCluster cluster = null; - private static ContainerSet dnContainerSet = null; private static StorageContainerManager scm = null; private static OzoneManager om = null; private static Set containerIdsWithDeletedBlocks; + private static long maxTransactionId = 0; @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG); String path = GenericTestUtils.getTempPath(TestOzoneShell.class.getSimpleName()); @@ -94,6 +103,11 @@ public static void init() throws Exception { TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, + 3, TimeUnit.SECONDS); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(1) @@ -101,15 +115,12 @@ public static void init() throws Exception { .build(); cluster.waitForClusterToBeReady(); store = OzoneClientFactory.getRpcClient(conf).getObjectStore(); - dnContainerSet = cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContainer().getContainerSet(); om = cluster.getOzoneManager(); scm = cluster.getStorageContainerManager(); containerIdsWithDeletedBlocks = new HashSet<>(); } - @Test(timeout = 60000) - @Ignore("Until delete background service is fixed.") + @Test public void testBlockDeletion() throws IOException, InterruptedException, TimeoutException { String volumeName = UUID.randomUUID().toString(); @@ -154,7 +165,9 @@ public void testBlockDeletion() Assert .assertTrue( OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm)); - Thread.sleep(5000); + + waitForDatanodeCommandRetry(); + waitForDatanodeBlockDeletionStart(); // The blocks should be deleted in the DN. Assert.assertTrue(verifyBlocksDeleted(omKeyLocationInfoGroupList)); @@ -169,11 +182,52 @@ public void testBlockDeletion() matchContainerTransactionIds(); // verify PENDING_DELETE_STATUS event is fired - verifyBlockDeletionEvent(); + verifyPendingDeleteEvent(); + + // Verify transactions committed + verifyTransactionsCommitted(); } - private void verifyBlockDeletionEvent() + private void waitForDatanodeBlockDeletionStart() + throws TimeoutException, InterruptedException { + LogCapturer logCapturer = + LogCapturer.captureLogs(DeleteBlocksCommandHandler.LOG); + logCapturer.clearOutput(); + GenericTestUtils.waitFor(() -> logCapturer.getOutput() + .contains("Start to delete container block"), + 500, 10000); + Thread.sleep(1000); + } + + /** + * Waits for datanode command to be retried when datanode is dead. + */ + private void waitForDatanodeCommandRetry() + throws TimeoutException, InterruptedException { + cluster.shutdownHddsDatanode(0); + LogCapturer logCapturer = + LogCapturer.captureLogs(RetriableDatanodeEventWatcher.LOG); + logCapturer.clearOutput(); + GenericTestUtils.waitFor(() -> logCapturer.getOutput() + .contains("RetriableDatanodeCommand type=deleteBlocksCommand"), + 500, 5000); + cluster.restartHddsDatanode(0); + } + + private void verifyTransactionsCommitted() throws IOException { + DeletedBlockLogImpl deletedBlockLog = + (DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog(); + for (int txnID = 1; txnID <= maxTransactionId; txnID++) { + Assert.assertNull( + deletedBlockLog.getDeletedStore().get(Longs.toByteArray(txnID))); + } + } + + private void verifyPendingDeleteEvent() throws IOException, InterruptedException { + ContainerSet dnContainerSet = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet(); LogCapturer logCapturer = LogCapturer.captureLogs(SCMBlockDeletingService.LOG); // Create dummy container reports with deleteTransactionId set as 0 @@ -209,6 +263,9 @@ private void verifyBlockDeletionEvent() } private void matchContainerTransactionIds() throws IOException { + ContainerSet dnContainerSet = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet(); List containerDataList = new ArrayList<>(); dnContainerSet.listContainer(0, 10000, containerDataList); for (ContainerData containerData : containerDataList) { @@ -216,6 +273,8 @@ private void matchContainerTransactionIds() throws IOException { if (containerIdsWithDeletedBlocks.contains(containerId)) { Assert.assertTrue( scm.getContainerInfo(containerId).getDeleteTransactionId() > 0); + maxTransactionId = max(maxTransactionId, + scm.getContainerInfo(containerId).getDeleteTransactionId()); } else { Assert.assertEquals( scm.getContainerInfo(containerId).getDeleteTransactionId(), 0); @@ -230,6 +289,9 @@ private void matchContainerTransactionIds() throws IOException { private boolean verifyBlocksCreated( List omKeyLocationInfoGroups) throws IOException { + ContainerSet dnContainerSet = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet(); return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { try { MetadataStore db = BlockUtils.getDB((KeyValueContainerData) @@ -245,6 +307,9 @@ private boolean verifyBlocksCreated( private boolean verifyBlocksDeleted( List omKeyLocationInfoGroups) throws IOException { + ContainerSet dnContainerSet = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet(); return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> { try { MetadataStore db = BlockUtils.getDB((KeyValueContainerData)