HDDS-256. Adding CommandStatusReport Handler. Contributed by Ajay Kumar.
This commit is contained in:
parent
8a6bb8409c
commit
89a0f80741
@ -0,0 +1,129 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.command;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatus;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||
.CommandStatusReportFromDatanode;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Handles CommandStatusReports from datanode.
|
||||
*/
|
||||
public class CommandStatusReportHandler implements
|
||||
EventHandler<CommandStatusReportFromDatanode> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory
|
||||
.getLogger(CommandStatusReportHandler.class);
|
||||
|
||||
@Override
|
||||
public void onMessage(CommandStatusReportFromDatanode report,
|
||||
EventPublisher publisher) {
|
||||
Preconditions.checkNotNull(report);
|
||||
List<CommandStatus> cmdStatusList = report.getReport().getCmdStatusList();
|
||||
Preconditions.checkNotNull(cmdStatusList);
|
||||
LOGGER.trace("Processing command status report for dn: {}", report
|
||||
.getDatanodeDetails());
|
||||
|
||||
// Route command status to its watchers.
|
||||
cmdStatusList.forEach(cmdStatus -> {
|
||||
LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
|
||||
.getCmdId(), cmdStatus.getType());
|
||||
switch (cmdStatus.getType()) {
|
||||
case replicateContainerCommand:
|
||||
publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
|
||||
ReplicationStatus(cmdStatus));
|
||||
break;
|
||||
case closeContainerCommand:
|
||||
publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
|
||||
CloseContainerStatus(cmdStatus));
|
||||
break;
|
||||
case deleteBlocksCommand:
|
||||
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new
|
||||
DeleteBlockCommandStatus(cmdStatus));
|
||||
break;
|
||||
default:
|
||||
LOGGER.debug("CommandStatus of type:{} not handled in " +
|
||||
"CommandStatusReportHandler.", cmdStatus.getType());
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper event for CommandStatus.
|
||||
*/
|
||||
public static class CommandStatusEvent implements IdentifiableEventPayload {
|
||||
private CommandStatus cmdStatus;
|
||||
|
||||
CommandStatusEvent(CommandStatus cmdStatus) {
|
||||
this.cmdStatus = cmdStatus;
|
||||
}
|
||||
|
||||
public CommandStatus getCmdStatus() {
|
||||
return cmdStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CommandStatusEvent:" + cmdStatus.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getId() {
|
||||
return cmdStatus.getCmdId();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper event for Replicate Command.
|
||||
*/
|
||||
public static class ReplicationStatus extends CommandStatusEvent {
|
||||
ReplicationStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper event for CloseContainer Command.
|
||||
*/
|
||||
public static class CloseContainerStatus extends CommandStatusEvent {
|
||||
CloseContainerStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper event for DeleteBlock Command.
|
||||
*/
|
||||
public static class DeleteBlockCommandStatus extends CommandStatusEvent {
|
||||
DeleteBlockCommandStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
* <p>
|
||||
* This package contains HDDS protocol related classes.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package contains HDDS protocol related classes.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.command;
|
||||
/*
|
||||
* Classes related to commands issued from SCM to DataNode.
|
||||
* */
|
@ -20,6 +20,7 @@
|
||||
package org.apache.hadoop.hdds.scm.events;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||
.CommandStatusReportFromDatanode;
|
||||
@ -104,6 +105,29 @@ public final class SCMEvents {
|
||||
public static final TypedEvent<DatanodeDetails> DEAD_NODE =
|
||||
new TypedEvent<>(DatanodeDetails.class, "Dead_Node");
|
||||
|
||||
/**
|
||||
* This event will be triggered by CommandStatusReportHandler whenever a
|
||||
* status for Replication SCMCommand is received.
|
||||
*/
|
||||
public static final Event<ReplicationStatus> REPLICATION_STATUS = new
|
||||
TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus");
|
||||
/**
|
||||
* This event will be triggered by CommandStatusReportHandler whenever a
|
||||
* status for CloseContainer SCMCommand is received.
|
||||
*/
|
||||
public static final Event<CloseContainerStatus>
|
||||
CLOSE_CONTAINER_STATUS =
|
||||
new TypedEvent<>(CloseContainerStatus.class,
|
||||
"CloseContainerCommandStatus");
|
||||
/**
|
||||
* This event will be triggered by CommandStatusReportHandler whenever a
|
||||
* status for DeleteBlock SCMCommand is received.
|
||||
*/
|
||||
public static final Event<DeleteBlockCommandStatus>
|
||||
DELETE_BLOCK_STATUS =
|
||||
new TypedEvent(DeleteBlockCommandStatus.class,
|
||||
"DeleteBlockCommandStatus");
|
||||
|
||||
/**
|
||||
* Private Ctor. Never Constructed.
|
||||
*/
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.scm.block.BlockManager;
|
||||
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
|
||||
@ -191,6 +192,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
|
||||
new NodeReportHandler(scmNodeManager);
|
||||
ContainerReportHandler containerReportHandler =
|
||||
new ContainerReportHandler(scmContainerManager, node2ContainerMap);
|
||||
CommandStatusReportHandler cmdStatusReportHandler =
|
||||
new CommandStatusReportHandler();
|
||||
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
|
||||
StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
|
||||
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
|
||||
@ -202,6 +205,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
|
||||
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
|
||||
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
|
||||
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
|
||||
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
|
||||
|
||||
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
|
||||
.OZONE_ADMINISTRATORS);
|
||||
|
@ -17,8 +17,14 @@
|
||||
package org.apache.hadoop.hdds.scm;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol
|
||||
.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.hdds.protocol
|
||||
.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
|
||||
import org.apache.hadoop.hdds.protocol
|
||||
.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -90,6 +96,18 @@ public static List<StorageReportProto> createStorageReport(long capacity,
|
||||
return reportList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Command Status report object.
|
||||
* @return CommandStatusReportsProto
|
||||
*/
|
||||
public static CommandStatusReportsProto createCommandStatusReport(
|
||||
List<CommandStatus> reports) {
|
||||
CommandStatusReportsProto.Builder report = CommandStatusReportsProto
|
||||
.newBuilder();
|
||||
report.addAllCmdStatus(reports);
|
||||
return report.build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get specified number of DatanodeDetails and registered them with node
|
||||
|
@ -0,0 +1,137 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.command;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsIdFactory;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatus;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||
.CommandStatusReportFromDatanode;
|
||||
|
||||
import org.apache.hadoop.hdds.server.events.Event;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public class TestCommandStatusReportHandler implements EventPublisher {
|
||||
|
||||
private static Logger LOG = LoggerFactory
|
||||
.getLogger(TestCommandStatusReportHandler.class);
|
||||
private CommandStatusReportHandler cmdStatusReportHandler;
|
||||
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
||||
.concat("/" + UUID.randomUUID().toString());
|
||||
;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
cmdStatusReportHandler = new CommandStatusReportHandler();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommandStatusReport() {
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(LOG);
|
||||
|
||||
CommandStatusReportFromDatanode report = this.getStatusReport(Collections
|
||||
.emptyList());
|
||||
cmdStatusReportHandler.onMessage(report, this);
|
||||
assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
|
||||
assertFalse(logCapturer.getOutput().contains
|
||||
("CloseContainerCommandStatus"));
|
||||
assertFalse(logCapturer.getOutput().contains
|
||||
("ReplicateCommandStatus"));
|
||||
|
||||
|
||||
report = this.getStatusReport(this.getCommandStatusList());
|
||||
cmdStatusReportHandler.onMessage(report, this);
|
||||
assertTrue(logCapturer.getOutput().contains("firing event of type " +
|
||||
"DeleteBlockCommandStatus"));
|
||||
assertTrue(logCapturer.getOutput().contains("firing event of type " +
|
||||
"CloseContainerCommandStatus"));
|
||||
assertTrue(logCapturer.getOutput().contains("firing event of type " +
|
||||
"ReplicateCommandStatus"));
|
||||
|
||||
assertTrue(logCapturer.getOutput().contains("type: " +
|
||||
"closeContainerCommand"));
|
||||
assertTrue(logCapturer.getOutput().contains("type: " +
|
||||
"deleteBlocksCommand"));
|
||||
assertTrue(logCapturer.getOutput().contains("type: " +
|
||||
"replicateContainerCommand"));
|
||||
|
||||
}
|
||||
|
||||
private CommandStatusReportFromDatanode getStatusReport(List<CommandStatus>
|
||||
reports) {
|
||||
CommandStatusReportsProto report = TestUtils.createCommandStatusReport
|
||||
(reports);
|
||||
DatanodeDetails dn = TestUtils.getDatanodeDetails();
|
||||
return new SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode
|
||||
(dn, report);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent
|
||||
(EVENT_TYPE event, PAYLOAD payload) {
|
||||
LOG.info("firing event of type {}, payload {}", event.getName(), payload
|
||||
.toString());
|
||||
}
|
||||
|
||||
private List<CommandStatus> getCommandStatusList() {
|
||||
List<CommandStatus> reports = new ArrayList<>(3);
|
||||
|
||||
// Add status message for replication, close container and delete block
|
||||
// command.
|
||||
CommandStatus.Builder builder = CommandStatus.newBuilder();
|
||||
|
||||
builder.setCmdId(HddsIdFactory.getLongId())
|
||||
.setStatus(CommandStatus.Status.EXECUTED)
|
||||
.setType(Type.deleteBlocksCommand);
|
||||
reports.add(builder.build());
|
||||
|
||||
builder.setCmdId(HddsIdFactory.getLongId())
|
||||
.setStatus(CommandStatus.Status.EXECUTED)
|
||||
.setType(Type.closeContainerCommand);
|
||||
reports.add(builder.build());
|
||||
|
||||
builder.setMsg("Not enough space")
|
||||
.setCmdId(HddsIdFactory.getLongId())
|
||||
.setStatus(CommandStatus.Status.FAILED)
|
||||
.setType(Type.replicateContainerCommand);
|
||||
reports.add(builder.build());
|
||||
return reports;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
/**
|
||||
* Make CheckStyle Happy.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.command;
|
Loading…
Reference in New Issue
Block a user