HDDS-187. Command status publisher for datanode.

Contributed by Ajay Kumar.
This commit is contained in:
Anu Engineer 2018-07-12 21:34:32 -07:00
parent 87eeb26e72
commit f89e265905
26 changed files with 934 additions and 140 deletions

View File

@ -17,7 +17,15 @@
*/
package org.apache.hadoop.hdds;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
/**
* Config class for HDDS.
*/
public final class HddsConfigKeys {
private HddsConfigKeys() {
}
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT;
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
/**
* HDDS Id generator.
*/
public final class HddsIdFactory {
private HddsIdFactory() {
}
private static final AtomicLong LONG_COUNTER = new AtomicLong(
System.currentTimeMillis());
/**
* Returns an incrementing long. This class doesn't
* persist initial value for long Id's, so incremental id's after restart
* may collide with previously generated Id's.
*
* @return long
*/
public static long getLongId() {
return LONG_COUNTER.incrementAndGet();
}
/**
* Returns a uuid.
*
* @return UUID.
*/
public static UUID getUUId() {
return UUID.randomUUID();
}
}

View File

@ -1061,4 +1061,13 @@
</description>
</property>
<property>
<name>hdds.command.status.report.interval</name>
<value>30s</value>
<tag>OZONE, DATANODE, MANAGEMENT</tag>
<description>Time interval of the datanode to send status of commands
executed since last report. Unit could be defined with
postfix (ns,ms,s,m,h,d)</description>
</property>
</configuration>

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.hdds.HddsIdFactory;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import org.junit.Assert;
import org.junit.Test;
/**
* Test the JMX interface for the rocksdb metastore implementation.
*/
public class TestHddsIdFactory {
private static final Set<Long> ID_SET = ConcurrentHashMap.newKeySet();
private static final int IDS_PER_THREAD = 10000;
private static final int NUM_OF_THREADS = 5;
@After
public void cleanup() {
ID_SET.clear();
}
@Test
public void testGetLongId() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> tasks = new ArrayList<>(5);
addTasks(tasks);
List<Future<Integer>> result = executor.invokeAll(tasks);
assertEquals(IDS_PER_THREAD * NUM_OF_THREADS, ID_SET.size());
for (Future<Integer> r : result) {
assertEquals(r.get().intValue(), IDS_PER_THREAD);
}
}
private void addTasks(List<Callable<Integer>> tasks) {
for (int i = 0; i < NUM_OF_THREADS; i++) {
Callable<Integer> task = () -> {
for (int idNum = 0; idNum < IDS_PER_THREAD; idNum++) {
long var = HddsIdFactory.getLongId();
if (ID_SET.contains(var)) {
Assert.fail("Duplicate id found");
}
ID_SET.add(var);
}
return IDS_PER_THREAD;
};
tasks.add(task);
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
/**
* Publishes CommandStatusReport which will be sent to SCM as part of
* heartbeat. CommandStatusReport consist of the following information:
* - type : type of command.
* - status : status of command execution (PENDING, EXECUTED, FAILURE).
* - cmdId : Command id.
* - msg : optional message.
*/
public class CommandStatusReportPublisher extends
ReportPublisher<CommandStatusReportsProto> {
private long cmdStatusReportInterval = -1;
@Override
protected long getReportFrequency() {
if (cmdStatusReportInterval == -1) {
cmdStatusReportInterval = getConf().getTimeDuration(
HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,
HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
return cmdStatusReportInterval;
}
@Override
protected CommandStatusReportsProto getReport() {
Map<Long, CommandStatus> map = this.getContext()
.getCommandStatusMap();
Iterator<Long> iterator = map.keySet().iterator();
CommandStatusReportsProto.Builder builder = CommandStatusReportsProto
.newBuilder();
iterator.forEachRemaining(key -> {
CommandStatus cmdStatus = map.get(key);
builder.addCmdStatus(cmdStatus.getProtoBufMessage());
// If status is still pending then don't remove it from map as
// CommandHandler will change its status when it works on this command.
if (!cmdStatus.getStatus().equals(Status.PENDING)) {
map.remove(key);
}
});
return builder.build();
}
}

View File

@ -93,4 +93,13 @@ private void publishReport() {
*/
protected abstract T getReport();
/**
* Returns {@link StateContext}.
*
* @return stateContext report
*/
protected StateContext getContext() {
return context;
}
}

View File

@ -19,6 +19,8 @@
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -49,6 +51,8 @@ public ReportPublisherFactory(Configuration conf) {
report2publisher.put(NodeReportProto.class, NodeReportPublisher.class);
report2publisher.put(ContainerReportsProto.class,
ContainerReportPublisher.class);
report2publisher.put(CommandStatusReportsProto.class,
CommandStatusReportPublisher.class);
}
/**

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -107,6 +108,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.setStateContext(context)
.addPublisherFor(NodeReportProto.class)
.addPublisherFor(ContainerReportsProto.class)
.addPublisherFor(CommandStatusReportsProto.class)
.build();
}

View File

@ -17,12 +17,17 @@
package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,6 +53,7 @@ public class StateContext {
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
private final Queue<SCMCommand> commandQueue;
private final Map<Long, CommandStatus> cmdStatusMap;
private final Lock lock;
private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount;
@ -68,6 +74,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
this.state = state;
this.parent = parent;
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
@ -269,6 +276,7 @@ public void addCommand(SCMCommand command) {
} finally {
lock.unlock();
}
this.addCmdStatus(command);
}
/**
@ -279,4 +287,66 @@ public long getExecutionCount() {
return stateExecutionCount.get();
}
/**
* Returns the next {@link CommandStatus} or null if it is empty.
*
* @return {@link CommandStatus} or Null.
*/
public CommandStatus getCmdStatus(Long key) {
return cmdStatusMap.get(key);
}
/**
* Adds a {@link CommandStatus} to the State Machine.
*
* @param status - {@link CommandStatus}.
*/
public void addCmdStatus(Long key, CommandStatus status) {
cmdStatusMap.put(key, status);
}
/**
* Adds a {@link CommandStatus} to the State Machine for given SCMCommand.
*
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
this.addCmdStatus(cmd.getCmdId(),
CommandStatusBuilder.newBuilder()
.setCmdId(cmd.getCmdId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
}
/**
* Get map holding all {@link CommandStatus} objects.
*
*/
public Map<Long, CommandStatus> getCommandStatusMap() {
return cmdStatusMap;
}
/**
* Remove object from cache in StateContext#cmdStatusMap.
*
*/
public void removeCommandStatus(Long cmdId) {
cmdStatusMap.remove(cmdId);
}
/**
* Updates status of a pending status command.
* @param cmdId command id
* @param cmdExecuted SCMCommand
* @return true if command status updated successfully else false.
*/
public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
if(cmdStatusMap.containsKey(cmdId)) {
cmdStatusMap.get(cmdId)
.setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
return true;
}
return false;
}
}

View File

@ -41,6 +41,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
LoggerFactory.getLogger(CloseContainerCommandHandler.class);
private int invocationCount;
private long totalTime;
private boolean cmdExecuted;
/**
* Constructs a ContainerReport handler.
@ -61,6 +62,7 @@ public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
invocationCount++;
cmdExecuted = false;
long startTime = Time.monotonicNow();
// TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
long containerID = -1;
@ -88,10 +90,11 @@ public void handle(SCMCommand command, OzoneContainer container,
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType);
cmdExecuted = true;
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
/**
* Generic interface for handlers.
@ -58,4 +59,14 @@ void handle(SCMCommand command, OzoneContainer container,
*/
long getAverageRunTime();
/**
* Default implementation for updating command status.
*/
default void updateCommandStatus(StateContext context, SCMCommand command,
boolean cmdExecuted, Logger log) {
if (!context.updateCommandStatus(command.getCmdId(), cmdExecuted)) {
log.debug("{} with cmdId:{} not found.", command.getType(),
command.getCmdId());
}
}
}

View File

@ -21,7 +21,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@ -54,7 +55,8 @@
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_NOT_FOUND;
/**
* Handle block deletion commands.
@ -68,6 +70,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
private final Configuration conf;
private int invocationCount;
private long totalTime;
private boolean cmdExecuted;
public DeleteBlocksCommandHandler(ContainerSet cset,
Configuration conf) {
@ -78,93 +81,98 @@ public DeleteBlocksCommandHandler(ContainerSet cset,
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}
LOG.debug("Processing block deletion command.");
invocationCount++;
cmdExecuted = false;
long startTime = Time.monotonicNow();
// move blocks to deleting state.
// this is a metadata update, the actual deletion happens in another
// recycling thread.
DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
DeletedContainerBlocksSummary summary =
DeletedContainerBlocksSummary.getFrom(containerBlocks);
LOG.info("Start to delete container blocks, TXIDs={}, "
+ "numOfContainers={}, numOfBlocks={}",
summary.getTxIDSummary(),
summary.getNumOfContainers(),
summary.getNumOfBlocks());
ContainerBlocksDeletionACKProto.Builder resultBuilder =
ContainerBlocksDeletionACKProto.newBuilder();
containerBlocks.forEach(entry -> {
DeleteBlockTransactionResult.Builder txResultBuilder =
DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID());
try {
long containerId = entry.getContainerID();
Container cont = containerSet.getContainer(containerId);
if(cont == null) {
throw new StorageContainerException("Unable to find the container "
+ containerId, CONTAINER_NOT_FOUND);
}
ContainerProtos.ContainerType containerType = cont.getContainerType();
switch (containerType) {
case KeyValueContainer:
KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData();
deleteKeyValueContainerBlocks(containerData, entry);
txResultBuilder.setSuccess(true);
break;
default:
LOG.error("Delete Blocks Command Handler is not implemented for " +
"containerType {}", containerType);
}
} catch (IOException e) {
LOG.warn("Failed to delete blocks for container={}, TXID={}",
entry.getContainerID(), entry.getTxID(), e);
txResultBuilder.setSuccess(false);
try {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
SCMCommandProto.Type.deleteBlocksCommand, command.getType());
return;
}
resultBuilder.addResults(txResultBuilder.build());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
LOG.debug("Processing block deletion command.");
invocationCount++;
// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
// move blocks to deleting state.
// this is a metadata update, the actual deletion happens in another
// recycling thread.
DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
DeletedContainerBlocksSummary summary =
DeletedContainerBlocksSummary.getFrom(containerBlocks);
LOG.info("Start to delete container blocks, TXIDs={}, "
+ "numOfContainers={}, numOfBlocks={}",
summary.getTxIDSummary(),
summary.getNumOfContainers(),
summary.getNumOfBlocks());
ContainerBlocksDeletionACKProto.Builder resultBuilder =
ContainerBlocksDeletionACKProto.newBuilder();
containerBlocks.forEach(entry -> {
DeleteBlockTransactionResult.Builder txResultBuilder =
DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID());
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result :
blockDeletionACK.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
long containerId = entry.getContainerID();
Container cont = containerSet.getContainer(containerId);
if (cont == null) {
throw new StorageContainerException("Unable to find the container "
+ containerId, CONTAINER_NOT_FOUND);
}
ContainerProtos.ContainerType containerType = cont.getContainerType();
switch (containerType) {
case KeyValueContainer:
KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData();
deleteKeyValueContainerBlocks(containerData, entry);
txResultBuilder.setSuccess(true);
break;
default:
LOG.error(
"Delete Blocks Command Handler is not implemented for " +
"containerType {}", containerType);
}
endPoint.getEndPoint()
.sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
LOG.error("Unable to send block deletion ACK to SCM {}",
endPoint.getAddress().toString(), e);
LOG.warn("Failed to delete blocks for container={}, TXID={}",
entry.getContainerID(), entry.getTxID(), e);
txResultBuilder.setSuccess(false);
}
resultBuilder.addResults(txResultBuilder.build());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result :
blockDeletionACK.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
}
endPoint.getEndPoint()
.sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
LOG.error("Unable to send block deletion ACK to SCM {}",
endPoint.getAddress().toString(), e);
}
}
}
cmdExecuted = true;
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
/**
* Move a bunch of blocks from a container to deleting state.
* This is a meta update, the actual deletes happen in async mode.
* Move a bunch of blocks from a container to deleting state. This is a meta
* update, the actual deletes happen in async mode.
*
* @param containerData - KeyValueContainerData
* @param delTX a block deletion transaction.
@ -222,7 +230,7 @@ private void deleteKeyValueContainerBlocks(
}
} else {
LOG.debug("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
+ " container {}, skip deleting it.", blk, containerId);
}
}

View File

@ -39,12 +39,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
private int invocationCount;
private long totalTime;
private boolean cmdExecuted;
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.warn("Replicate command is not yet handled");
try {
cmdExecuted = true;
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
}
}
@Override

View File

@ -1,19 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
@ -24,7 +23,6 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
/**
* Asks datanode to close a container.
*/
@ -36,6 +34,15 @@ public class CloseContainerCommand
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType) {
super();
this.containerID = containerID;
this.replicationType = replicationType;
}
// Should be called only for protobuf conversion
private CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType, long cmdId) {
super(cmdId);
this.containerID = containerID;
this.replicationType = replicationType;
}
@ -63,6 +70,7 @@ public byte[] getProtoBufMessage() {
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID)
.setCmdId(getCmdId())
.setReplicationType(replicationType).build();
}
@ -70,8 +78,8 @@ public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID(),
closeContainerProto.getReplicationType());
closeContainerProto.getReplicationType(), closeContainerProto
.getCmdId());
}
public long getContainerID() {

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
/**
* A class that is used to communicate status of datanode commands.
*/
public class CommandStatus {
private SCMCommandProto.Type type;
private Long cmdId;
private Status status;
private String msg;
public Type getType() {
return type;
}
public Long getCmdId() {
return cmdId;
}
public Status getStatus() {
return status;
}
public String getMsg() {
return msg;
}
/**
* To allow change of status once commandStatus is initialized.
*
* @param status
*/
public void setStatus(Status status) {
this.status = status;
}
/**
* Returns a CommandStatus from the protocol buffers.
*
* @param cmdStatusProto - protoBuf Message
* @return CommandStatus
*/
public CommandStatus getFromProtoBuf(
StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
return CommandStatusBuilder.newBuilder()
.setCmdId(cmdStatusProto.getCmdId())
.setStatus(cmdStatusProto.getStatus())
.setType(cmdStatusProto.getType())
.setMsg(cmdStatusProto.getMsg()).build();
}
/**
* Returns a CommandStatus from the protocol buffers.
*
* @return StorageContainerDatanodeProtocolProtos.CommandStatus
*/
public StorageContainerDatanodeProtocolProtos.CommandStatus
getProtoBufMessage() {
StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
.setCmdId(this.getCmdId())
.setStatus(this.getStatus())
.setType(this.getType());
if (this.getMsg() != null) {
builder.setMsg(this.getMsg());
}
return builder.build();
}
/**
* Builder class for CommandStatus.
*/
public static final class CommandStatusBuilder {
private SCMCommandProto.Type type;
private Long cmdId;
private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status;
private String msg;
private CommandStatusBuilder() {
}
public static CommandStatusBuilder newBuilder() {
return new CommandStatusBuilder();
}
public CommandStatusBuilder setType(Type type) {
this.type = type;
return this;
}
public CommandStatusBuilder setCmdId(Long cmdId) {
this.cmdId = cmdId;
return this;
}
public CommandStatusBuilder setStatus(Status status) {
this.status = status;
return this;
}
public CommandStatusBuilder setMsg(String msg) {
this.msg = msg;
return this;
}
public CommandStatus build() {
CommandStatus commandStatus = new CommandStatus();
commandStatus.type = this.type;
commandStatus.msg = this.msg;
commandStatus.status = this.status;
commandStatus.cmdId = this.cmdId;
return commandStatus;
}
}
}

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -36,6 +36,14 @@ public class DeleteBlocksCommand extends
public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
super();
this.blocksTobeDeleted = blocks;
}
// Should be called only for protobuf conversion
private DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks,
long cmdId) {
super(cmdId);
this.blocksTobeDeleted = blocks;
}
@ -56,11 +64,12 @@ public byte[] getProtoBufMessage() {
public static DeleteBlocksCommand getFromProtobuf(
DeleteBlocksCommandProto deleteBlocksProto) {
return new DeleteBlocksCommand(deleteBlocksProto
.getDeletedBlocksTransactionsList());
.getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId());
}
public DeleteBlocksCommandProto getProto() {
return DeleteBlocksCommandProto.newBuilder()
.setCmdId(getCmdId())
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
}
}

View File

@ -30,7 +30,6 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import com.google.common.base.Preconditions;
@ -41,11 +40,19 @@ public class ReplicateContainerCommand
extends SCMCommand<ReplicateContainerCommandProto> {
private final long containerID;
private final List<DatanodeDetails> sourceDatanodes;
public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes) {
super();
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
}
// Should be called only for protobuf conversion
public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes, long cmdId) {
super(cmdId);
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
}
@ -62,6 +69,7 @@ public byte[] getProtoBufMessage() {
public ReplicateContainerCommandProto getProto() {
Builder builder = ReplicateContainerCommandProto.newBuilder()
.setCmdId(getCmdId())
.setContainerID(containerID);
for (DatanodeDetails dd : sourceDatanodes) {
builder.addSources(dd.getProtoBufMessage());
@ -75,12 +83,12 @@ public static ReplicateContainerCommand getFromProtobuf(
List<DatanodeDetails> datanodeDetails =
protoMessage.getSourcesList()
.stream()
.map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList());
.stream()
.map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList());
return new ReplicateContainerCommand(protoMessage.getContainerID(),
datanodeDetails);
datanodeDetails, protoMessage.getCmdId());
}

View File

@ -49,6 +49,16 @@ public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
/**
* Not implemented for ReregisterCommand.
*
* @return cmdId.
*/
@Override
public long getCmdId() {
return 0;
}
public ReregisterCommandProto getProto() {
return ReregisterCommandProto
.newBuilder()

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.protocol.commands;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@ -27,6 +28,15 @@
* @param <T>
*/
public abstract class SCMCommand<T extends GeneratedMessage> {
private long cmdId;
SCMCommand() {
this.cmdId = HddsIdFactory.getLongId();
}
SCMCommand(long cmdId) {
this.cmdId = cmdId;
}
/**
* Returns the type of this command.
* @return Type
@ -38,4 +48,13 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
* @return A protobuf message.
*/
public abstract byte[] getProtoBufMessage();
/**
* Gets the commandId of this object.
* @return uuid.
*/
public long getCmdId() {
return cmdId;
}
}

View File

@ -80,6 +80,7 @@ message SCMHeartbeatRequestProto {
optional NodeReportProto nodeReport = 2;
optional ContainerReportsProto containerReport = 3;
optional ContainerActionsProto containerActions = 4;
optional CommandStatusReportsProto commandStatusReport = 5;
}
/*
@ -127,6 +128,22 @@ message ContainerReportsProto {
repeated ContainerInfo reports = 1;
}
message CommandStatusReportsProto {
repeated CommandStatus cmdStatus = 1;
}
message CommandStatus {
enum Status {
PENDING = 1;
EXECUTED = 2;
FAILED = 3;
}
required int64 cmdId = 1;
required Status status = 2 [default = PENDING];
required SCMCommandProto.Type type = 3;
optional string msg = 4;
}
message ContainerActionsProto {
repeated ContainerAction containerActions = 1;
}
@ -193,6 +210,7 @@ message ReregisterCommandProto {}
// HB response from SCM, contains a list of block deletion transactions.
message DeleteBlocksCommandProto {
repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
required int64 cmdId = 3;
}
// The deleted blocks which are stored in deletedBlock.db of scm.
@ -226,6 +244,7 @@ This command asks the datanode to close a specific container.
message CloseContainerCommandProto {
required int64 containerID = 1;
required hadoop.hdds.ReplicationType replicationType = 2;
required int64 cmdId = 3;
}
/**
@ -233,6 +252,7 @@ This command asks the datanode to delete a specific container.
*/
message DeleteContainerCommandProto {
required int64 containerID = 1;
required int64 cmdId = 2;
}
/**
@ -241,6 +261,7 @@ This command asks the datanode to replicate a container from specific sources.
message ReplicateContainerCommandProto {
required int64 containerID = 1;
repeated DatanodeDetailsProto sources = 2;
required int64 cmdId = 3;
}
/**

View File

@ -18,6 +18,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@ -59,6 +61,9 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
new HashMap();
private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>();
private AtomicInteger commandStatusReport = new AtomicInteger(0);
private List<CommandStatus> cmdStatusList = new LinkedList<>();
private List<SCMCommandProto> scmCommandRequests = new LinkedList<>();
/**
* Returns the number of heartbeats made to this class.
*
@ -180,10 +185,12 @@ private void sleepIfNeeded() {
sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException {
rpcCount.incrementAndGet();
heartbeatCount.incrementAndGet();
if(heartbeat.hasCommandStatusReport()){
cmdStatusList.addAll(heartbeat.getCommandStatusReport().getCmdStatusList());
commandStatusReport.incrementAndGet();
}
sleepIfNeeded();
List<SCMCommandProto>
cmdResponses = new LinkedList<>();
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(scmCommandRequests)
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
.build();
}
@ -302,4 +309,24 @@ public void reset() {
nodeContainers.clear();
}
public int getCommandStatusReportCount() {
return commandStatusReport.get();
}
public List<CommandStatus> getCmdStatusList() {
return cmdStatusList;
}
public List<SCMCommandProto> getScmCommandRequests() {
return scmCommandRequests;
}
public void clearScmCommandRequests() {
scmCommandRequests.clear();
}
public void addScmCommandRequest(SCMCommandProto scmCmd) {
scmCommandRequests.add(scmCmd);
}
}

View File

@ -20,18 +20,27 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@ -42,12 +51,20 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test cases to test {@link ReportPublisher}.
*/
public class TestReportPublisher {
private static Configuration config;
@BeforeClass
public static void setup() {
config = new OzoneConfiguration();
}
/**
* Dummy report publisher for testing.
*/
@ -93,9 +110,9 @@ public void testScheduledReport() throws InterruptedException {
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
Thread.sleep(150);
Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount);
Assert.assertEquals(2, ((DummyReportPublisher) publisher).getReportCount);
executorService.shutdown();
}
@ -110,11 +127,57 @@ public void testPublishReport() throws InterruptedException {
publisher.init(dummyContext, executorService);
Thread.sleep(150);
executorService.shutdown();
Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
verify(dummyContext, times(1)).addReport(null);
}
@Test
public void testCommandStatusPublisher() throws InterruptedException {
StateContext dummyContext = Mockito.mock(StateContext.class);
ReportPublisher publisher = new CommandStatusReportPublisher();
final Map<Long, CommandStatus> cmdStatusMap = new ConcurrentHashMap<>();
when(dummyContext.getCommandStatusMap()).thenReturn(cmdStatusMap);
publisher.setConf(config);
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Assert.assertEquals(0,
((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusCount());
// Insert to status object to state context map and then get the report.
CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder()
.setCmdId(HddsIdFactory.getLongId())
.setType(Type.deleteBlocksCommand)
.setStatus(Status.PENDING)
.build();
CommandStatus obj2 = CommandStatus.CommandStatusBuilder.newBuilder()
.setCmdId(HddsIdFactory.getLongId())
.setType(Type.closeContainerCommand)
.setStatus(Status.EXECUTED)
.build();
cmdStatusMap.put(obj1.getCmdId(), obj1);
cmdStatusMap.put(obj2.getCmdId(), obj2);
Assert.assertEquals("Should publish report with 2 status objects", 2,
((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusCount());
Assert.assertEquals(
"Next report should have 1 status objects as command status o"
+ "bjects are still in Pending state",
1, ((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusCount());
Assert.assertTrue(
"Next report should have 1 status objects as command status "
+ "objects are still in Pending state",
((CommandStatusReportPublisher) publisher).getReport()
.getCmdStatusList().get(0).getStatus().equals(Status.PENDING));
executorService.shutdown();
}
@Test
public void testAddingReportToHeartbeat() {
Configuration conf = new OzoneConfiguration();
@ -168,10 +231,10 @@ private static DatanodeDetails getDatanodeDetails() {
* Adds the report to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
* @param report the report to be added.
* @param report the report to be added.
*/
private static void addReport(SCMHeartbeatRequestProto.Builder requestBuilder,
GeneratedMessage report) {
private static void addReport(SCMHeartbeatRequestProto.Builder
requestBuilder, GeneratedMessage report) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {

View File

@ -21,8 +21,12 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
@ -34,47 +38,54 @@
public final class SCMEvents {
/**
* NodeReports are sent out by Datanodes. This report is
* received by SCMDatanodeHeartbeatDispatcher and NodeReport Event is
* generated.
* NodeReports are sent out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and NodeReport Event is generated.
*/
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
/**
* ContainerReports are send out by Datanodes. This report
* is received by SCMDatanodeHeartbeatDispatcher and Container_Report Event
* i generated.
* ContainerReports are send out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and Container_Report Event
* isTestSCMDatanodeHeartbeatDispatcher generated.
*/
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
/**
* A Command status report will be sent by datanodes. This repoort is received
* by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
*/
public static final TypedEvent<CommandStatusReportFromDatanode>
CMD_STATUS_REPORT =
new TypedEvent<>(CommandStatusReportFromDatanode.class,
"Cmd_Status_Report");
/**
* When ever a command for the Datanode needs to be issued by any component
* inside SCM, a Datanode_Command event is generated. NodeManager listens
* to these events and dispatches them to Datanode for further processing.
* inside SCM, a Datanode_Command event is generated. NodeManager listens to
* these events and dispatches them to Datanode for further processing.
*/
public static final Event<CommandForDatanode> DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
/**
* A Close Container Event can be triggered under many condition.
* Some of them are:
* 1. A Container is full, then we stop writing further information to
* that container. DN's let SCM know that current state and sends a
* informational message that allows SCM to close the container.
*
* 2. If a pipeline is open; for example Ratis; if a single node fails,
* we will proactively close these containers.
*
* Once a command is dispatched to DN, we will also listen to updates from
* the datanode which lets us know that this command completed or timed out.
* A Close Container Event can be triggered under many condition. Some of them
* are: 1. A Container is full, then we stop writing further information to
* that container. DN's let SCM know that current state and sends a
* informational message that allows SCM to close the container.
* <p>
* 2. If a pipeline is open; for example Ratis; if a single node fails, we
* will proactively close these containers.
* <p>
* Once a command is dispatched to DN, we will also listen to updates from the
* datanode which lets us know that this command completed or timed out.
*/
public static final TypedEvent<ContainerID> CLOSE_CONTAINER =
new TypedEvent<>(ContainerID.class, "Close_Container");
/**
* This event will be triggered whenever a new datanode is
* registered with SCM.
* This event will be triggered whenever a new datanode is registered with
* SCM.
*/
public static final TypedEvent<DatanodeDetails> NEW_NODE =
new TypedEvent<>(DatanodeDetails.class, "New_Node");

View File

@ -19,6 +19,8 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -37,7 +39,7 @@
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
@ -86,6 +88,13 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
heartbeat.getContainerReport()));
}
if (heartbeat.hasCommandStatusReport()) {
eventPublisher.fireEvent(CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(datanodeDetails,
heartbeat.getCommandStatusReport()));
}
return commands;
}
@ -136,4 +145,16 @@ public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
}
}
/**
* Container report event payload with origin.
*/
public static class CommandStatusReportFromDatanode
extends ReportFromDatanode<CommandStatusReportsProto> {
public CommandStatusReportFromDatanode(DatanodeDetails datanodeDetails,
CommandStatusReportsProto report) {
super(datanodeDetails, report);
}
}
}

View File

@ -21,6 +21,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.scm.server.
SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -42,6 +46,7 @@
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
@ -91,6 +96,8 @@ public void testContainerReportDispatcher() throws IOException {
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
CommandStatusReportsProto commandStatusReport =
CommandStatusReportsProto.getDefaultInstance();
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
@ -98,9 +105,18 @@ public void testContainerReportDispatcher() throws IOException {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Assert.assertEquals(event, CONTAINER_REPORT);
Assert.assertEquals(containerReport,
((ContainerReportFromDatanode)payload).getReport());
Assert.assertTrue(
event.equals(CONTAINER_REPORT)
|| event.equals(CMD_STATUS_REPORT));
if (payload instanceof ContainerReportFromDatanode) {
Assert.assertEquals(containerReport,
((ContainerReportFromDatanode) payload).getReport());
}
if (payload instanceof CommandStatusReportFromDatanode) {
Assert.assertEquals(commandStatusReport,
((CommandStatusReportFromDatanode) payload).getReport());
}
eventReceived.incrementAndGet();
}
});
@ -111,9 +127,10 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
.setCommandStatusReport(commandStatusReport)
.build();
dispatcher.dispatch(heartbeat);
Assert.assertEquals(1, eventReceived.get());
Assert.assertEquals(2, eventReceived.get());
}

View File

@ -16,12 +16,29 @@
*/
package org.apache.hadoop.ozone.container.common;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -54,6 +71,7 @@
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
@ -74,6 +92,9 @@
.createEndpoint;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Tests the endpoints.
@ -83,6 +104,7 @@ public class TestEndPoint {
private static RPC.Server scmServer;
private static ScmTestMock scmServerImpl;
private static File testDir;
private static Configuration config;
@AfterClass
public static void tearDown() throws Exception {
@ -99,6 +121,12 @@ public static void setUp() throws Exception {
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
scmServerImpl, serverAddress, 10);
testDir = PathUtils.getTestDir(TestEndPoint.class);
config = SCMTestUtils.getConf();
config.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
config.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
config
.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,"1s");
}
@Test
@ -312,7 +340,87 @@ public void testHeartbeat() throws Exception {
}
}
private void heartbeatTaskHelper(InetSocketAddress scmAddress,
@Test
public void testHeartbeatWithCommandStatusReport() throws Exception {
DatanodeDetails dataNode = getDatanodeDetails();
try (EndpointStateMachine rpcEndPoint =
createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
String storageId = UUID.randomUUID().toString();
// Add some scmCommands for heartbeat response
addScmCommands();
SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(dataNode.getProtoBufMessage())
.setNodeReport(TestUtils.createNodeReport(
getStorageReports(storageId)))
.build();
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
.sendHeartbeat(request);
assertNotNull(responseProto);
assertEquals(3, responseProto.getCommandsCount());
assertEquals(0, scmServerImpl.getCommandStatusReportCount());
// Send heartbeat again from heartbeat endpoint task
final StateContext stateContext = heartbeatTaskHelper(serverAddress, 3000);
Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
assertNotNull(map);
assertEquals("Should have 3 objects", 3, map.size());
assertTrue(map.containsKey(Long.valueOf(1)));
assertTrue(map.containsKey(Long.valueOf(2)));
assertTrue(map.containsKey(Long.valueOf(3)));
assertTrue(map.get(Long.valueOf(1)).getType()
.equals(Type.closeContainerCommand));
assertTrue(map.get(Long.valueOf(2)).getType()
.equals(Type.replicateContainerCommand));
assertTrue(
map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
assertTrue(map.get(Long.valueOf(1)).getStatus().equals(Status.PENDING));
assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
scmServerImpl.clearScmCommandRequests();
}
}
private void addScmCommands() {
SCMCommandProto closeCommand = SCMCommandProto.newBuilder()
.setCloseContainerCommandProto(
CloseContainerCommandProto.newBuilder().setCmdId(1)
.setContainerID(1)
.setReplicationType(ReplicationType.RATIS)
.build())
.setCommandType(Type.closeContainerCommand)
.build();
SCMCommandProto replicationCommand = SCMCommandProto.newBuilder()
.setReplicateContainerCommandProto(
ReplicateContainerCommandProto.newBuilder()
.setCmdId(2)
.setContainerID(2)
.build())
.setCommandType(Type.replicateContainerCommand)
.build();
SCMCommandProto deleteBlockCommand = SCMCommandProto.newBuilder()
.setDeleteBlocksCommandProto(
DeleteBlocksCommandProto.newBuilder()
.setCmdId(3)
.addDeletedBlocksTransactions(
DeletedBlocksTransaction.newBuilder()
.setContainerID(45)
.setCount(1)
.setTxID(23)
.build())
.build())
.setCommandType(Type.deleteBlocksCommand)
.build();
scmServerImpl.addScmCommandRequest(closeCommand);
scmServerImpl.addScmCommandRequest(deleteBlockCommand);
scmServerImpl.addScmCommandRequest(replicationCommand);
}
private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout) throws Exception {
Configuration conf = SCMTestUtils.getConf();
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
@ -344,6 +452,7 @@ private void heartbeatTaskHelper(InetSocketAddress scmAddress,
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
rpcEndPoint.getState());
return stateContext;
}
}