HDDS-325. Add event watcher for delete blocks command. Contributed by Lokesh Jain.

This commit is contained in:
Nanda kumar 2018-10-01 13:49:55 +05:30
parent fd6be5898a
commit f7ff8c051e
34 changed files with 503 additions and 223 deletions

View File

@ -171,6 +171,10 @@ public void resetPipeline() {
public Map<String, DatanodeDetails> getDatanodes() {
return datanodes;
}
public boolean isEmpty() {
return datanodes.isEmpty();
}
/**
* Returns the leader host.
*

View File

@ -82,6 +82,6 @@ protected CommandStatusReportsProto getReport() {
map.remove(key);
}
});
return builder.build();
return builder.getCmdStatusCount() > 0 ? builder.build() : null;
}
}

View File

@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto
@ -35,8 +37,11 @@
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus
.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands
.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import static java.lang.Math.min;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,6 +57,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@ -67,7 +73,7 @@ public class StateContext {
private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
private final List<GeneratedMessage> reports;
private final Queue<ContainerAction> containerActions;
private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
@ -174,19 +180,23 @@ public void setState(DatanodeStateMachine.DatanodeStates state) {
* @param report report to be added
*/
public void addReport(GeneratedMessage report) {
if (report != null) {
synchronized (reports) {
reports.add(report);
}
}
}
/**
* Returns the next report, or null if the report queue is empty.
* Adds the reports which could not be sent by heartbeat back to the
* reports list.
*
* @return report
* @param reportsToPutBack list of reports which failed to be sent by
* heartbeat.
*/
public GeneratedMessage getNextReport() {
public void putBackReports(List<GeneratedMessage> reportsToPutBack) {
synchronized (reports) {
return reports.poll();
reports.addAll(0, reportsToPutBack);
}
}
@ -207,19 +217,14 @@ public List<GeneratedMessage> getAllAvailableReports() {
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
List<GeneratedMessage> reportList = new ArrayList<>();
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (reports) {
if (!reports.isEmpty()) {
int size = reports.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
GeneratedMessage report = reports.poll();
Preconditions.checkNotNull(report);
reportList.add(report);
}
}
return reportList;
List<GeneratedMessage> tempList = reports.subList(
0, min(reports.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
}
return reportsToReturn;
}
@ -442,9 +447,14 @@ public void addCmdStatus(Long key, CommandStatus status) {
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
CommandStatusBuilder statusBuilder;
if (cmd.getType() == Type.deleteBlocksCommand) {
statusBuilder = new DeleteBlockCommandStatusBuilder();
} else {
statusBuilder = CommandStatusBuilder.newBuilder();
}
this.addCmdStatus(cmd.getId(),
CommandStatusBuilder.newBuilder()
.setCmdId(cmd.getId())
statusBuilder.setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
@ -469,13 +479,13 @@ public void removeCommandStatus(Long cmdId) {
/**
* Updates status of a pending status command.
* @param cmdId command id
* @param cmdExecuted SCMCommand
* @param cmdStatusUpdater Consumer to update command status.
* @return true if command status updated successfully else false.
*/
public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
public boolean updateCommandStatus(Long cmdId,
Consumer<CommandStatus> cmdStatusUpdater) {
if(cmdStatusMap.containsKey(cmdId)) {
cmdStatusMap.get(cmdId)
.setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
cmdStatusUpdater.accept(cmdStatusMap.get(cmdId));
return true;
}
return false;

View File

@ -117,7 +117,8 @@ public void handle(SCMCommand command, OzoneContainer container,
cmdExecuted = false;
}
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}

View File

@ -23,9 +23,12 @@
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import java.util.function.Consumer;
/**
* Generic interface for handlers.
*/
@ -63,8 +66,8 @@ void handle(SCMCommand command, OzoneContainer container,
* Default implementation for updating command status.
*/
default void updateCommandStatus(StateContext context, SCMCommand command,
boolean cmdExecuted, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdExecuted)) {
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.debug("{} with Id:{} not found.", command.getType(),
command.getId());
}

View File

@ -38,12 +38,12 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
@ -54,6 +54,7 @@
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_NOT_FOUND;
@ -63,7 +64,7 @@
*/
public class DeleteBlocksCommandHandler implements CommandHandler {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
private final ContainerSet containerSet;
@ -83,6 +84,7 @@ public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
cmdExecuted = false;
long startTime = Time.monotonicNow();
ContainerBlocksDeletionACKProto blockDeletionACK = null;
try {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
@ -144,31 +146,28 @@ public void handle(SCMCommand command, OzoneContainer container,
.setDnId(context.getParent().getDatanodeDetails()
.getUuid().toString());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
blockDeletionACK = resultBuilder.build();
// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result :
blockDeletionACK.getResultsList()) {
for (DeleteBlockTransactionResult result : blockDeletionACK
.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
}
endPoint.getEndPoint()
.sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
LOG.error("Unable to send block deletion ACK to SCM {}",
endPoint.getAddress().toString(), e);
}
}
}
cmdExecuted = true;
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
final ContainerBlocksDeletionACKProto deleteAck =
blockDeletionACK;
Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
cmdStatus.setStatus(cmdExecuted);
((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck);
};
updateCommandStatus(context, command, statusUpdater, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
@ -238,8 +237,8 @@ private void deleteKeyValueContainerBlocks(
}
}
containerDB.put(DFSUtil.string2Bytes(
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
containerDB
.put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
Longs.toByteArray(delTX.getTxID()));
containerData
.updateDeleteTransactionId(delTX.getTxID());

View File

@ -77,7 +77,8 @@ public void handle(SCMCommand command, OzoneContainer container,
supervisor.addTask(replicationTask);
} finally {
updateCommandStatus(context, command, true, LOG);
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(true), LOG);
}
}

View File

@ -54,6 +54,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@ -124,11 +125,11 @@ public void setDatanodeDetailsProto(DatanodeDetailsProto
@Override
public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndpoint.lock();
SCMHeartbeatRequestProto.Builder requestBuilder = null;
try {
Preconditions.checkState(this.datanodeDetailsProto != null);
SCMHeartbeatRequestProto.Builder requestBuilder =
SCMHeartbeatRequestProto.newBuilder()
requestBuilder = SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
addContainerActions(requestBuilder);
@ -139,6 +140,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
// put back the reports which failed to be sent
putBackReports(requestBuilder);
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
@ -146,6 +149,24 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
return rpcEndpoint.getState();
}
// TODO: Make it generic.
private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
if (requestBuilder.hasContainerReport()) {
reports.add(requestBuilder.getContainerReport());
}
if (requestBuilder.hasNodeReport()) {
reports.add(requestBuilder.getNodeReport());
}
if (requestBuilder.getCommandStatusReportsCount() != 0) {
for (GeneratedMessage msg : requestBuilder
.getCommandStatusReportsList()) {
reports.add(msg);
}
}
context.putBackReports(reports);
}
/**
* Adds all the available reports to heartbeat.
*
@ -158,11 +179,15 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
if (descriptor.isRepeated()) {
requestBuilder.addRepeatedField(descriptor, report);
} else {
requestBuilder.setField(descriptor, report);
}
}
}
}
}
/**
* Adds all the pending ContainerActions to the heartbeat.

View File

@ -19,10 +19,13 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@ -32,7 +35,10 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -171,6 +177,20 @@ public void verifyContainerData(ContainerData containerData)
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(
kvContainerData, config);
MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config);
MetadataKeyFilters.KeyPrefixFilter filter =
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(OzoneConsts.DELETING_KEY_PREFIX);
int numPendingDeletionBlocks =
containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter)
.size();
kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks);
byte[] delTxnId = containerDB.get(
DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX));
if (delTxnId != null) {
kvContainerData
.updateDeleteTransactionId(Longs.fromByteArray(delTxnId));
}
containerSet.addContainer(kvContainer);
} else {
throw new StorageContainerException("Container File is corrupted. " +

View File

@ -22,11 +22,6 @@
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -77,12 +72,4 @@ SCMRegisteredResponseProto register(
ContainerReportsProto containerReportsRequestProto,
PipelineReportsProto pipelineReports) throws IOException;
/**
* Used by datanode to send block deletion ACK to SCM.
* @param request block deletion transactions.
* @return block deletion transaction response.
* @throws IOException
*/
ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto request) throws IOException;
}

View File

@ -20,11 +20,13 @@
import java.util.UUID;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
/**
* Command for the datanode with the destination address.
*/
public class CommandForDatanode<T extends GeneratedMessage> {
public class CommandForDatanode<T extends GeneratedMessage> implements
IdentifiableEventPayload {
private final UUID datanodeId;
@ -42,4 +44,8 @@ public UUID getDatanodeId() {
public SCMCommand<T> getCommand() {
return command;
}
public long getId() {
return command.getId();
}
}

View File

@ -35,6 +35,13 @@ public class CommandStatus {
private Status status;
private String msg;
CommandStatus(Type type, Long cmdId, Status status, String msg) {
this.type = type;
this.cmdId = cmdId;
this.status = status;
this.msg = msg;
}
public Type getType() {
return type;
}
@ -60,6 +67,10 @@ public void setStatus(Status status) {
this.status = status;
}
public void setStatus(boolean cmdExecuted) {
setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
}
/**
* Returns a CommandStatus from the protocol buffers.
*
@ -72,7 +83,8 @@ public CommandStatus getFromProtoBuf(
.setCmdId(cmdStatusProto.getCmdId())
.setStatus(cmdStatusProto.getStatus())
.setType(cmdStatusProto.getType())
.setMsg(cmdStatusProto.getMsg()).build();
.setMsg(cmdStatusProto.getMsg())
.build();
}
/**
* Returns a CommandStatus from the protocol buffers.
@ -95,20 +107,36 @@ public CommandStatus getFromProtoBuf(
/**
* Builder class for CommandStatus.
*/
public static final class CommandStatusBuilder {
public static class CommandStatusBuilder {
private SCMCommandProto.Type type;
private Long cmdId;
private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status;
private String msg;
private CommandStatusBuilder() {
CommandStatusBuilder() {
}
public static CommandStatusBuilder newBuilder() {
return new CommandStatusBuilder();
}
public Type getType() {
return type;
}
public Long getCmdId() {
return cmdId;
}
public Status getStatus() {
return status;
}
public String getMsg() {
return msg;
}
public CommandStatusBuilder setType(Type commandType) {
this.type = commandType;
return this;
@ -130,12 +158,7 @@ public CommandStatusBuilder setMsg(String message) {
}
public CommandStatus build() {
CommandStatus commandStatus = new CommandStatus();
commandStatus.type = this.type;
commandStatus.msg = this.msg;
commandStatus.status = this.status;
commandStatus.cmdId = this.cmdId;
return commandStatus;
return new CommandStatus(type, cmdId, status, msg);
}
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
public class DeleteBlockCommandStatus extends CommandStatus {
private ContainerBlocksDeletionACKProto blocksDeletionAck = null;
public DeleteBlockCommandStatus(Type type, Long cmdId,
StorageContainerDatanodeProtocolProtos.CommandStatus.Status status,
String msg, ContainerBlocksDeletionACKProto blocksDeletionAck) {
super(type, cmdId, status, msg);
this.blocksDeletionAck = blocksDeletionAck;
}
public void setBlocksDeletionAck(
ContainerBlocksDeletionACKProto deletionAck) {
blocksDeletionAck = deletionAck;
}
@Override
public CommandStatus getFromProtoBuf(
StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
return DeleteBlockCommandStatusBuilder.newBuilder()
.setBlockDeletionAck(cmdStatusProto.getBlockDeletionAck())
.setCmdId(cmdStatusProto.getCmdId())
.setStatus(cmdStatusProto.getStatus())
.setType(cmdStatusProto.getType())
.setMsg(cmdStatusProto.getMsg())
.build();
}
@Override
public StorageContainerDatanodeProtocolProtos.CommandStatus getProtoBufMessage() {
StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
.setCmdId(this.getCmdId())
.setStatus(this.getStatus())
.setType(this.getType());
if (blocksDeletionAck != null) {
builder.setBlockDeletionAck(blocksDeletionAck);
}
if (this.getMsg() != null) {
builder.setMsg(this.getMsg());
}
return builder.build();
}
public static final class DeleteBlockCommandStatusBuilder
extends CommandStatusBuilder {
private ContainerBlocksDeletionACKProto blocksDeletionAck = null;
public static DeleteBlockCommandStatusBuilder newBuilder() {
return new DeleteBlockCommandStatusBuilder();
}
public DeleteBlockCommandStatusBuilder setBlockDeletionAck(
ContainerBlocksDeletionACKProto deletionAck) {
this.blocksDeletionAck = deletionAck;
return this;
}
@Override
public CommandStatus build() {
return new DeleteBlockCommandStatus(getType(), getCmdId(), getStatus(),
getMsg(), blocksDeletionAck);
}
}
}

View File

@ -23,11 +23,6 @@
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
@ -169,16 +164,4 @@ public SCMRegisteredResponseProto register(
return response;
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
final ContainerBlocksDeletionACKResponseProto resp;
try {
resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
deletedBlocks);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return resp;
}
}

View File

@ -32,11 +32,6 @@
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -96,15 +91,4 @@ public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
}
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
RpcController controller, ContainerBlocksDeletionACKProto request)
throws ServiceException {
try {
return impl.sendContainerBlocksDeletionACK(request);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto {
required DatanodeDetailsProto datanodeDetails = 1;
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
optional CommandStatusReportsProto commandStatusReport = 4;
repeated CommandStatusReportsProto commandStatusReports = 4;
optional ContainerActionsProto containerActions = 5;
optional PipelineActionsProto pipelineActions = 6;
optional PipelineReportsProto pipelineReports = 7;
@ -145,6 +145,7 @@ message CommandStatus {
required Status status = 2 [default = PENDING];
required SCMCommandProto.Type type = 3;
optional string msg = 4;
optional ContainerBlocksDeletionACKProto blockDeletionAck = 5;
}
message ContainerActionsProto {
@ -272,10 +273,6 @@ message ContainerBlocksDeletionACKProto {
required string dnId = 2;
}
// SendACK response returned by datanode to SCM, currently empty.
message ContainerBlocksDeletionACKResponseProto {
}
/**
This command asks the datanode to close a specific container.
*/
@ -386,8 +383,4 @@ service StorageContainerDatanodeProtocolService {
*/
rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
/**
* Sends the block deletion ACK to SCM.
*/
rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
}

View File

@ -22,6 +22,8 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.VersionInfo;
@ -29,11 +31,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
@ -196,11 +193,13 @@ private void sleepIfNeeded() {
sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
if(heartbeat.hasCommandStatusReport()){
cmdStatusList.addAll(heartbeat.getCommandStatusReport()
.getCmdStatusList());
if (heartbeat.getCommandStatusReportsCount() != 0) {
for (CommandStatusReportsProto statusReport : heartbeat
.getCommandStatusReportsList()) {
cmdStatusList.addAll(statusReport.getCmdStatusList());
commandStatusReport.incrementAndGet();
}
}
sleepIfNeeded();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(
scmCommandRequests)
@ -305,13 +304,6 @@ public int getContainerCountsForDatanode(DatanodeDetails datanodeDetails) {
return 0;
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto request) throws IOException {
return ContainerBlocksDeletionACKResponseProto
.newBuilder().getDefaultInstanceForType();
}
/**
* Reset the mock Scm for test to get a fresh start without rebuild MockScm.
*/

View File

@ -138,9 +138,7 @@ public void testCommandStatusPublisher() throws InterruptedException {
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Assert.assertEquals(0,
((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusCount());
Assert.assertNull(((CommandStatusReportPublisher) publisher).getReport());
// Insert to status object to state context map and then get the report.
CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder()

View File

@ -77,7 +77,7 @@ public void testheartbeatWithoutReports() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
Assert.assertFalse(heartbeat.hasContainerActions());
}
@ -108,7 +108,7 @@ public void testheartbeatWithNodeReports() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
Assert.assertFalse(heartbeat.hasContainerActions());
}
@ -139,7 +139,7 @@ public void testheartbeatWithContainerReports() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
Assert.assertFalse(heartbeat.hasContainerActions());
}
@ -170,7 +170,7 @@ public void testheartbeatWithCommandStatusReports() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertTrue(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0);
Assert.assertFalse(heartbeat.hasContainerActions());
}
@ -201,7 +201,7 @@ public void testheartbeatWithContainerActions() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertFalse(heartbeat.hasNodeReport());
Assert.assertFalse(heartbeat.hasContainerReport());
Assert.assertFalse(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0);
Assert.assertTrue(heartbeat.hasContainerActions());
}
@ -235,7 +235,7 @@ public void testheartbeatWithAllReports() throws Exception {
Assert.assertTrue(heartbeat.hasDatanodeDetails());
Assert.assertTrue(heartbeat.hasNodeReport());
Assert.assertTrue(heartbeat.hasContainerReport());
Assert.assertTrue(heartbeat.hasCommandStatusReport());
Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0);
Assert.assertTrue(heartbeat.hasContainerActions());
}

View File

@ -61,7 +61,8 @@ public boolean addTransaction(DeletedBlocksTransaction tx,
try {
ContainerWithPipeline containerWithPipeline =
mappingService.getContainerWithPipeline(tx.getContainerID());
if (containerWithPipeline.getContainerInfo().isContainerOpen()) {
if (containerWithPipeline.getContainerInfo().isContainerOpen()
|| containerWithPipeline.getPipeline().isEmpty()) {
return false;
}
pipeline = containerWithPipeline.getPipeline();
@ -70,25 +71,19 @@ public boolean addTransaction(DeletedBlocksTransaction tx,
return false;
}
if (pipeline == null) {
SCMBlockDeletingService.LOG.warn(
"Container {} not found, continue to process next",
tx.getContainerID());
return false;
}
boolean success = false;
for (DatanodeDetails dd : pipeline.getMachines()) {
UUID dnID = dd.getUuid();
if (dnsWithTransactionCommitted == null ||
!dnsWithTransactionCommitted.contains(dnID)) {
// Transaction need not be sent to dns which have already committed it
addTransactionToDN(dnID, tx);
success = addTransactionToDN(dnID, tx);
}
}
return true;
return success;
}
private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
if (transactions.containsKey(dnID)) {
List<DeletedBlocksTransaction> txs = transactions.get(dnID);
if (txs != null && txs.size() < maximumAllowedTXNum) {
@ -103,14 +98,17 @@ private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
if (!hasContained) {
txs.add(tx);
currentTXNum++;
return true;
}
}
} else {
currentTXNum++;
transactions.put(dnID, tx);
return true;
}
SCMBlockDeletingService.LOG
.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
return false;
}
Set<UUID> getDatanodeIDs() {

View File

@ -22,11 +22,17 @@
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.command
.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@ -75,9 +81,10 @@
* equally same chance to be retrieved which only depends on the nature
* order of the transaction ID.
*/
public class DeletedBlockLogImpl implements DeletedBlockLog {
public class DeletedBlockLogImpl
implements DeletedBlockLog, EventHandler<DeleteBlockStatus> {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
private static final byte[] LATEST_TXID =
@ -123,7 +130,7 @@ public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
}
@VisibleForTesting
MetadataStore getDeletedStore() {
public MetadataStore getDeletedStore() {
return deletedStore;
}
@ -269,6 +276,8 @@ public void commitTransactions(
deletedStore.delete(Longs.toByteArray(txID));
}
}
LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
txID, containerId, dnID);
} catch (IOException e) {
LOG.warn("Could not commit delete block transaction: " +
transactionResult.getTxID(), e);
@ -407,4 +416,13 @@ public Map<Long, Long> getTransactions(
lock.unlock();
}
}
@Override
public void onMessage(DeleteBlockStatus deleteBlockStatus,
EventPublisher publisher) {
ContainerBlocksDeletionACKProto ackProto =
deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
commitTransactions(ackProto.getResultsList(),
UUID.fromString(ackProto.getDnId()));
}
}

View File

@ -165,7 +165,7 @@ public EmptyTaskResult call() throws Exception {
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND,
new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs)));
LOG.debug(
"Added delete block command for datanode {} in the queue,"

View File

@ -63,8 +63,10 @@ public void onMessage(CommandStatusReportFromDatanode report,
CloseContainerStatus(cmdStatus));
break;
case deleteBlocksCommand:
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new
DeleteBlockCommandStatus(cmdStatus));
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
new DeleteBlockStatus(cmdStatus));
}
break;
default:
LOGGER.debug("CommandStatus of type:{} not handled in " +
@ -120,8 +122,8 @@ public CloseContainerStatus(CommandStatus cmdStatus) {
/**
* Wrapper event for DeleteBlock Command.
*/
public static class DeleteBlockCommandStatus extends CommandStatusEvent {
public DeleteBlockCommandStatus(CommandStatus cmdStatus) {
public static class DeleteBlockStatus extends CommandStatusEvent {
public DeleteBlockStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -176,7 +175,10 @@ public ContainerInfo getContainer(final long containerID) throws
}
/**
* Returns the ContainerInfo from the container ID.
* Returns the ContainerInfo and pipeline from the containerID. If container
* has no available replicas in datanodes it returns pipeline with no
* datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
* an empty pipeline.
*
* @param containerID - ID of container.
* @return - ContainerWithPipeline such as creation state and the pipeline.
@ -200,6 +202,7 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID)
contInfo = ContainerInfo.fromProtobuf(temp);
Pipeline pipeline;
String leaderId = "";
if (contInfo.isContainerOpen()) {
// If pipeline with given pipeline Id already exist return it
pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
@ -207,14 +210,12 @@ public ContainerWithPipeline getContainerWithPipeline(long containerID)
// For close containers create pipeline from datanodes with replicas
Set<DatanodeDetails> dnWithReplicas = containerStateManager
.getContainerReplicas(contInfo.containerID());
if (dnWithReplicas.size() == 0) {
throw new SCMException("Can't create a pipeline for container with "
+ "no replica.", ResultCodes.NO_REPLICA_FOUND);
if (!dnWithReplicas.isEmpty()) {
leaderId = dnWithReplicas.iterator().next().getUuidString();
}
pipeline =
new Pipeline(dnWithReplicas.iterator().next().getUuidString(),
contInfo.getState(), ReplicationType.STAND_ALONE,
contInfo.getReplicationFactor(), PipelineID.randomId());
pipeline = new Pipeline(leaderId, contInfo.getState(),
ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
PipelineID.randomId());
dnWithReplicas.forEach(pipeline::addMember);
}
return new ContainerWithPipeline(contInfo, pipeline);

View File

@ -23,8 +23,7 @@
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.CloseContainerStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.DeleteBlockCommandStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
@ -128,6 +127,10 @@ public final class SCMEvents {
public static final Event<CommandForDatanode> DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
public static final TypedEvent<CommandForDatanode>
RETRIABLE_DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "Retriable_Datanode_Command");
/**
* A Close Container Event can be triggered under many condition. Some of them
* are: 1. A Container is full, then we stop writing further information to
@ -179,7 +182,7 @@ public final class SCMEvents {
* status for Replication SCMCommand is received.
*/
public static final Event<ReplicationStatus> REPLICATION_STATUS = new
TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus");
TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
/**
* This event will be triggered by CommandStatusReportHandler whenever a
* status for CloseContainer SCMCommand is received.
@ -187,15 +190,15 @@ public final class SCMEvents {
public static final Event<CloseContainerStatus>
CLOSE_CONTAINER_STATUS =
new TypedEvent<>(CloseContainerStatus.class,
"CloseContainerCommandStatus");
"Close_Container_Command_Status");
/**
* This event will be triggered by CommandStatusReportHandler whenever a
* status for DeleteBlock SCMCommand is received.
*/
public static final Event<DeleteBlockCommandStatus>
public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus>
DELETE_BLOCK_STATUS =
new TypedEvent<>(DeleteBlockCommandStatus.class,
"DeleteBlockCommandStatus");
new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class,
"Delete_Block_Status");
/**
* This event will be triggered while processing container reports from DN
@ -203,7 +206,7 @@ public final class SCMEvents {
* deleteTransactionID on SCM.
*/
public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus");
new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status");
/**
* This is the command for ReplicationManager to handle under/over

View File

@ -121,10 +121,13 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
heartbeat.getPipelineActions()));
}
if (heartbeat.hasCommandStatusReport()) {
if (heartbeat.getCommandStatusReportsCount() != 0) {
for (CommandStatusReportsProto commandStatusReport : heartbeat
.getCommandStatusReportsList()) {
eventPublisher.fireEvent(CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(datanodeDetails,
heartbeat.getCommandStatusReport()));
commandStatusReport));
}
}
return commands;

View File

@ -49,15 +49,6 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos
.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto
@ -100,7 +91,6 @@
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
@ -242,18 +232,6 @@ public SCMHeartbeatResponseProto sendHeartbeat(
.addAllCommands(cmdResponses).build();
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto acks) throws IOException {
if (acks.getResultsCount() > 0) {
List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
scm.getScmBlockManager().getDeletedBlockLog()
.commitTransactions(resultList, UUID.fromString(acks.getDnId()));
}
return ContainerBlocksDeletionACKResponseProto.newBuilder()
.getDefaultInstanceForType();
}
/**
* Returns a SCMCommandRepose from the SCM Command.
*

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@ -68,6 +69,7 @@
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
@ -253,6 +255,13 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
new RetriableDatanodeEventWatcher<>(
SCMEvents.RETRIABLE_DATANODE_COMMAND,
SCMEvents.DELETE_BLOCK_STATUS,
commandWatcherLeaseManager);
retriableDatanodeEventWatcher.start(eventQueue);
//TODO: support configurable containerPlacement policy
ContainerPlacementPolicy containerPlacementPolicy =
new SCMContainerPlacementCapacity(scmNodeManager, conf);
@ -282,6 +291,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
httpServer = new StorageContainerManagerHttpServer(conf);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
@ -296,6 +306,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
replicationStatus.getChillModeStatusListener());
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
(DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.CommandStatusEvent;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventWatcher;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* EventWatcher for start events and completion events with payload of type
* RetriablePayload and RetriableCompletionPayload respectively.
*/
public class RetriableDatanodeEventWatcher<T extends CommandStatusEvent>
extends EventWatcher<CommandForDatanode, T> {
public static final Logger LOG =
LoggerFactory.getLogger(RetriableDatanodeEventWatcher.class);
public RetriableDatanodeEventWatcher(Event<CommandForDatanode> startEvent,
Event<T> completionEvent, LeaseManager<Long> leaseManager) {
super(startEvent, completionEvent, leaseManager);
}
@Override
protected void onTimeout(EventPublisher publisher,
CommandForDatanode payload) {
LOG.info("RetriableDatanodeCommand type={} with id={} timed out. Retrying.",
payload.getCommand().getType(), payload.getId());
//put back to the original queue
publisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, payload);
}
@Override
protected void onFinished(EventPublisher publisher,
CommandForDatanode payload) {
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;

View File

@ -67,20 +67,20 @@ public void testCommandStatusReport() {
CommandStatusReportFromDatanode report = this.getStatusReport(Collections
.emptyList());
cmdStatusReportHandler.onMessage(report, this);
assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
assertFalse(logCapturer.getOutput().contains("Delete_Block_Status"));
assertFalse(logCapturer.getOutput().contains(
"CloseContainerCommandStatus"));
assertFalse(logCapturer.getOutput().contains("ReplicateCommandStatus"));
"Close_Container_Command_Status"));
assertFalse(logCapturer.getOutput().contains("Replicate_Command_Status"));
report = this.getStatusReport(this.getCommandStatusList());
cmdStatusReportHandler.onMessage(report, this);
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"DeleteBlockCommandStatus"));
"Delete_Block_Status"));
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"CloseContainerCommandStatus"));
"Close_Container_Command_Status"));
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"ReplicateCommandStatus"));
"Replicate_Command_Status"));
assertTrue(logCapturer.getOutput().contains("type: " +
"closeContainerCommand"));

View File

@ -127,7 +127,7 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
.setCommandStatusReport(commandStatusReport)
.addCommandStatusReports(commandStatusReport)
.build();
dispatcher.dispatch(heartbeat);
Assert.assertEquals(2, eventReceived.get());

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.junit.Assert.fail;
@ -188,6 +189,8 @@ public void testBlockDeletionTransactions() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
3000,
TimeUnit.MILLISECONDS);

View File

@ -27,6 +27,7 @@
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdfs.DFSUtil;
@ -46,13 +47,14 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.ozShell.TestOzoneShell;
import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.utils.MetadataStore;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.event.Level;
import java.io.File;
import java.io.IOException;
@ -64,8 +66,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.lang.Math.max;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds
.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
@ -76,14 +83,16 @@ public class TestBlockDeletion {
private static OzoneConfiguration conf = null;
private static ObjectStore store;
private static MiniOzoneCluster cluster = null;
private static ContainerSet dnContainerSet = null;
private static StorageContainerManager scm = null;
private static OzoneManager om = null;
private static Set<Long> containerIdsWithDeletedBlocks;
private static long maxTransactionId = 0;
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG);
String path =
GenericTestUtils.getTempPath(TestOzoneShell.class.getSimpleName());
@ -94,6 +103,11 @@ public static void init() throws Exception {
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
3, TimeUnit.SECONDS);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(1)
@ -101,15 +115,12 @@ public static void init() throws Exception {
.build();
cluster.waitForClusterToBeReady();
store = OzoneClientFactory.getRpcClient(conf).getObjectStore();
dnContainerSet = cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().getContainer().getContainerSet();
om = cluster.getOzoneManager();
scm = cluster.getStorageContainerManager();
containerIdsWithDeletedBlocks = new HashSet<>();
}
@Test(timeout = 60000)
@Ignore("Until delete background service is fixed.")
@Test
public void testBlockDeletion()
throws IOException, InterruptedException, TimeoutException {
String volumeName = UUID.randomUUID().toString();
@ -154,7 +165,9 @@ public void testBlockDeletion()
Assert
.assertTrue(
OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm));
Thread.sleep(5000);
waitForDatanodeCommandRetry();
waitForDatanodeBlockDeletionStart();
// The blocks should be deleted in the DN.
Assert.assertTrue(verifyBlocksDeleted(omKeyLocationInfoGroupList));
@ -169,11 +182,52 @@ public void testBlockDeletion()
matchContainerTransactionIds();
// verify PENDING_DELETE_STATUS event is fired
verifyBlockDeletionEvent();
verifyPendingDeleteEvent();
// Verify transactions committed
verifyTransactionsCommitted();
}
private void verifyBlockDeletionEvent()
private void waitForDatanodeBlockDeletionStart()
throws TimeoutException, InterruptedException {
LogCapturer logCapturer =
LogCapturer.captureLogs(DeleteBlocksCommandHandler.LOG);
logCapturer.clearOutput();
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("Start to delete container block"),
500, 10000);
Thread.sleep(1000);
}
/**
* Waits for datanode command to be retried when datanode is dead.
*/
private void waitForDatanodeCommandRetry()
throws TimeoutException, InterruptedException {
cluster.shutdownHddsDatanode(0);
LogCapturer logCapturer =
LogCapturer.captureLogs(RetriableDatanodeEventWatcher.LOG);
logCapturer.clearOutput();
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("RetriableDatanodeCommand type=deleteBlocksCommand"),
500, 5000);
cluster.restartHddsDatanode(0);
}
private void verifyTransactionsCommitted() throws IOException {
DeletedBlockLogImpl deletedBlockLog =
(DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog();
for (int txnID = 1; txnID <= maxTransactionId; txnID++) {
Assert.assertNull(
deletedBlockLog.getDeletedStore().get(Longs.toByteArray(txnID)));
}
}
private void verifyPendingDeleteEvent()
throws IOException, InterruptedException {
ContainerSet dnContainerSet =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
LogCapturer logCapturer =
LogCapturer.captureLogs(SCMBlockDeletingService.LOG);
// Create dummy container reports with deleteTransactionId set as 0
@ -209,6 +263,9 @@ private void verifyBlockDeletionEvent()
}
private void matchContainerTransactionIds() throws IOException {
ContainerSet dnContainerSet =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
List<ContainerData> containerDataList = new ArrayList<>();
dnContainerSet.listContainer(0, 10000, containerDataList);
for (ContainerData containerData : containerDataList) {
@ -216,6 +273,8 @@ private void matchContainerTransactionIds() throws IOException {
if (containerIdsWithDeletedBlocks.contains(containerId)) {
Assert.assertTrue(
scm.getContainerInfo(containerId).getDeleteTransactionId() > 0);
maxTransactionId = max(maxTransactionId,
scm.getContainerInfo(containerId).getDeleteTransactionId());
} else {
Assert.assertEquals(
scm.getContainerInfo(containerId).getDeleteTransactionId(), 0);
@ -230,6 +289,9 @@ private void matchContainerTransactionIds() throws IOException {
private boolean verifyBlocksCreated(
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups)
throws IOException {
ContainerSet dnContainerSet =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
try {
MetadataStore db = BlockUtils.getDB((KeyValueContainerData)
@ -245,6 +307,9 @@ private boolean verifyBlocksCreated(
private boolean verifyBlocksDeleted(
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups)
throws IOException {
ContainerSet dnContainerSet =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet();
return OzoneTestUtils.performOperationOnKeyContainers((blockID) -> {
try {
MetadataStore db = BlockUtils.getDB((KeyValueContainerData)