HDDS-268. Add SCM close container watcher. Contributed by Ajay Kumar.
This commit is contained in:
parent
6ccb809c2d
commit
85c3fe341a
@ -102,13 +102,13 @@ public void start(EventQueue queue) {
|
||||
queue.addHandler(startEvent, this::handleStartEvent);
|
||||
|
||||
queue.addHandler(completionEvent, (completionPayload, publisher) -> {
|
||||
long id = completionPayload.getId();
|
||||
try {
|
||||
handleCompletion(id, publisher);
|
||||
handleCompletion(completionPayload, publisher);
|
||||
} catch (LeaseNotFoundException e) {
|
||||
//It's already done. Too late, we already retried it.
|
||||
//Not a real problem.
|
||||
LOG.warn("Completion event without active lease. Id={}", id);
|
||||
LOG.warn("Completion event without active lease. Id={}",
|
||||
completionPayload.getId());
|
||||
}
|
||||
});
|
||||
|
||||
@ -140,9 +140,11 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void handleCompletion(long id,
|
||||
EventPublisher publisher) throws LeaseNotFoundException {
|
||||
protected synchronized void handleCompletion(COMPLETION_PAYLOAD
|
||||
completionPayload, EventPublisher publisher) throws
|
||||
LeaseNotFoundException {
|
||||
metrics.incrementCompletedEvents();
|
||||
long id = completionPayload.getId();
|
||||
leaseManager.release(id);
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id);
|
||||
trackedEvents.remove(payload);
|
||||
@ -196,4 +198,12 @@ public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
|
||||
protected EventWatcherMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a tracked event to which the specified id is
|
||||
* mapped, or {@code null} if there is no mapping for the id.
|
||||
*/
|
||||
public TIMEOUT_PAYLOAD getTrackedEventbyId(long id) {
|
||||
return trackedEventsByID.get(id);
|
||||
}
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ public long getId() {
|
||||
* Wrapper event for Replicate Command.
|
||||
*/
|
||||
public static class ReplicationStatus extends CommandStatusEvent {
|
||||
ReplicationStatus(CommandStatus cmdStatus) {
|
||||
public ReplicationStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
@ -112,7 +112,7 @@ public static class ReplicationStatus extends CommandStatusEvent {
|
||||
* Wrapper event for CloseContainer Command.
|
||||
*/
|
||||
public static class CloseContainerStatus extends CommandStatusEvent {
|
||||
CloseContainerStatus(CommandStatus cmdStatus) {
|
||||
public CloseContainerStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
@ -121,7 +121,7 @@ public static class CloseContainerStatus extends CommandStatusEvent {
|
||||
* Wrapper event for DeleteBlock Command.
|
||||
*/
|
||||
public static class DeleteBlockCommandStatus extends CommandStatusEvent {
|
||||
DeleteBlockCommandStatus(CommandStatus cmdStatus) {
|
||||
public DeleteBlockCommandStatus(CommandStatus cmdStatus) {
|
||||
super(cmdStatus);
|
||||
}
|
||||
}
|
||||
|
@ -23,12 +23,14 @@
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
|
||||
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
|
||||
|
||||
/**
|
||||
* In case of a node failure, volume failure, volume out of spapce, node
|
||||
@ -80,6 +82,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
|
||||
new CloseContainerCommand(containerID.getId(),
|
||||
info.getReplicationType(), info.getPipelineID()));
|
||||
publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
|
||||
publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
|
||||
CloseContainerRetryableReq(containerID));
|
||||
}
|
||||
try {
|
||||
// Finalize event will make sure the state of the container transitions
|
||||
@ -107,4 +111,26 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to create retryable event. Prevents redundant requests for same
|
||||
* container Id.
|
||||
*/
|
||||
public static class CloseContainerRetryableReq implements
|
||||
IdentifiableEventPayload {
|
||||
|
||||
private ContainerID containerID;
|
||||
public CloseContainerRetryableReq(ContainerID containerID) {
|
||||
this.containerID = containerID;
|
||||
}
|
||||
|
||||
public ContainerID getContainerID() {
|
||||
return containerID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getId() {
|
||||
return containerID.getId();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* <p>http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* <p>Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.container;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
|
||||
.CloseContainerStatus;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.server.events.Event;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.EventWatcher;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
|
||||
.CloseContainerRetryableReq;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
|
||||
* CommandStatusReport. If required it will re-trigger CloseContainer command
|
||||
* for DataNodes to CloseContainerEventHandler.
|
||||
*/
|
||||
public class CloseContainerWatcher extends
|
||||
EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(CloseContainerWatcher.class);
|
||||
private final Mapping containerManager;
|
||||
|
||||
public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
|
||||
Event<CloseContainerStatus> completionEvent,
|
||||
LeaseManager<Long> leaseManager, Mapping containerManager) {
|
||||
super(startEvent, completionEvent, leaseManager);
|
||||
this.containerManager = containerManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onTimeout(EventPublisher publisher,
|
||||
CloseContainerRetryableReq payload) {
|
||||
// Let CloseContainerEventHandler handle this message.
|
||||
this.resendEventToHandler(payload.getId(), publisher);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFinished(EventPublisher publisher,
|
||||
CloseContainerRetryableReq payload) {
|
||||
LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
|
||||
.getContainerID().getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void handleCompletion(CloseContainerStatus status,
|
||||
EventPublisher publisher) throws LeaseNotFoundException {
|
||||
// If status is PENDING then return without doing anything.
|
||||
if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
|
||||
return;
|
||||
}
|
||||
|
||||
CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
|
||||
super.handleCompletion(status, publisher);
|
||||
// If status is FAILED then send a msg to Handler to resend the command.
|
||||
if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
|
||||
!= null) {
|
||||
this.resendEventToHandler(closeCont.getId(), publisher);
|
||||
}
|
||||
}
|
||||
|
||||
private void resendEventToHandler(long containerID, EventPublisher
|
||||
publisher) {
|
||||
try {
|
||||
// Check if container is still open
|
||||
if (containerManager.getContainer(containerID).isContainerOpen()) {
|
||||
publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
|
||||
ContainerID.valueof(containerID));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error in CloseContainerWatcher while processing event " +
|
||||
"for containerId {} ExceptionMsg: ", containerID, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
@ -27,6 +27,7 @@
|
||||
.DeleteBlockCommandStatus;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
|
||||
.ReplicationStatus;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||
.ContainerActionsFromDatanode;
|
||||
@ -103,6 +104,16 @@ public final class SCMEvents {
|
||||
public static final TypedEvent<ContainerID> CLOSE_CONTAINER =
|
||||
new TypedEvent<>(ContainerID.class, "Close_Container");
|
||||
|
||||
/**
|
||||
* A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by
|
||||
* CloseContainerEventHandler after sending a SCMCommand to DataNode.
|
||||
* CloseContainerWatcher will track this event. Watcher will be responsible
|
||||
* for retrying it in event of failure or timeout.
|
||||
*/
|
||||
public static final TypedEvent<CloseContainerRetryableReq>
|
||||
CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>(
|
||||
CloseContainerRetryableReq.class, "Close_Container_Retryable");
|
||||
|
||||
/**
|
||||
* This event will be triggered whenever a new datanode is registered with
|
||||
* SCM.
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
|
||||
@ -257,6 +258,13 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
|
||||
scmContainerManager.getStateManager(), eventQueue,
|
||||
commandWatcherLeaseManager);
|
||||
|
||||
// setup CloseContainer watcher
|
||||
CloseContainerWatcher closeContainerWatcher =
|
||||
new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager,
|
||||
scmContainerManager);
|
||||
closeContainerWatcher.start(eventQueue);
|
||||
|
||||
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
|
||||
.OZONE_ADMINISTRATORS);
|
||||
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
|
||||
|
@ -0,0 +1,287 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsIdFactory;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.CommandStatus;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
|
||||
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
|
||||
.CloseContainerStatus;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
|
||||
.CloseContainerRetryableReq;
|
||||
import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.hdds.server.events.EventWatcher;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test class for {@link CloseContainerWatcher}.
|
||||
* */
|
||||
public class TestCloseContainerWatcher implements EventHandler<ContainerID> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestCloseContainerWatcher.class);
|
||||
private static EventWatcher<CloseContainerRetryableReq, CloseContainerStatus>
|
||||
watcher;
|
||||
private static LeaseManager<Long> leaseManager;
|
||||
private static ContainerMapping containerMapping = Mockito
|
||||
.mock(ContainerMapping.class);
|
||||
private static EventQueue queue;
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(1000*15);
|
||||
|
||||
@After
|
||||
public void stop() {
|
||||
leaseManager.shutdown();
|
||||
queue.close();
|
||||
}
|
||||
|
||||
/*
|
||||
* This test will test watcher for Failure status event.
|
||||
* */
|
||||
@Test
|
||||
public void testWatcherForFailureStatusEvent() throws
|
||||
InterruptedException, IOException {
|
||||
setupWatcher(90000L);
|
||||
long id1 = HddsIdFactory.getLongId();
|
||||
long id2 = HddsIdFactory.getLongId();
|
||||
queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
|
||||
setupMock(id1, id2, true);
|
||||
GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(LOG);
|
||||
GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(CloseContainerWatcher.LOG);
|
||||
GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
|
||||
testLogger.clearOutput();
|
||||
watcherLogger.clearOutput();
|
||||
|
||||
CommandStatus cmdStatus1 = CommandStatus.newBuilder()
|
||||
.setCmdId(id1)
|
||||
.setStatus(CommandStatus.Status.FAILED)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
CommandStatus cmdStatus2 = CommandStatus.newBuilder()
|
||||
.setCmdId(id2)
|
||||
.setStatus(CommandStatus.Status.FAILED)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
|
||||
// File events to watcher
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id1)));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id2)));
|
||||
Thread.sleep(10L);
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
|
||||
CloseContainerStatus(cmdStatus1));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
|
||||
CloseContainerStatus(cmdStatus2));
|
||||
|
||||
Thread.sleep(1000*4L);
|
||||
// validation
|
||||
assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
|
||||
"containerId: " + id1 + " executed"));
|
||||
assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
|
||||
"containerId: " + id2 + " executed"));
|
||||
assertTrue(
|
||||
testLogger.getOutput().contains("Handling closeContainerEvent " +
|
||||
"for containerId: id=" + id1));
|
||||
assertTrue(testLogger.getOutput().contains("Handling closeContainerEvent " +
|
||||
"for containerId: id=" + id2));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWatcherForPendingStatusEvent() throws
|
||||
InterruptedException, IOException {
|
||||
setupWatcher(90000L);
|
||||
long id1 = HddsIdFactory.getLongId();
|
||||
long id2 = HddsIdFactory.getLongId();
|
||||
queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
|
||||
setupMock(id1, id2, true);
|
||||
GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(LOG);
|
||||
GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(CloseContainerWatcher.LOG);
|
||||
GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
|
||||
testLogger.clearOutput();
|
||||
watcherLogger.clearOutput();
|
||||
|
||||
CommandStatus cmdStatus1 = CommandStatus.newBuilder()
|
||||
.setCmdId(id1)
|
||||
.setStatus(CommandStatus.Status.PENDING)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
CommandStatus cmdStatus2 = CommandStatus.newBuilder()
|
||||
.setCmdId(id2)
|
||||
.setStatus(CommandStatus.Status.PENDING)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
|
||||
// File events to watcher
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id1)));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id2)));
|
||||
Thread.sleep(10L);
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
|
||||
CloseContainerStatus(cmdStatus1));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
|
||||
CloseContainerStatus(cmdStatus2));
|
||||
|
||||
Thread.sleep(1000*2L);
|
||||
// validation
|
||||
assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
|
||||
+ "for containerId: " + id1 + " executed"));
|
||||
assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
|
||||
+ "for containerId: " + id2 + " executed"));
|
||||
assertFalse(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id1));
|
||||
assertFalse(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id2));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWatcherForExecutedStatusEvent()
|
||||
throws IOException, InterruptedException {
|
||||
setupWatcher(90000L);
|
||||
long id1 = HddsIdFactory.getLongId();
|
||||
long id2 = HddsIdFactory.getLongId();
|
||||
queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
|
||||
setupMock(id1, id2, true);
|
||||
GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(LOG);
|
||||
GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(CloseContainerWatcher.LOG);
|
||||
GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
|
||||
testLogger.clearOutput();
|
||||
watcherLogger.clearOutput();
|
||||
|
||||
// When both of the pending event are executed successfully by DataNode
|
||||
CommandStatus cmdStatus1 = CommandStatus.newBuilder()
|
||||
.setCmdId(id1)
|
||||
.setStatus(CommandStatus.Status.EXECUTED)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
CommandStatus cmdStatus2 = CommandStatus.newBuilder()
|
||||
.setCmdId(id2)
|
||||
.setStatus(CommandStatus.Status.EXECUTED)
|
||||
.setType(Type.closeContainerCommand).build();
|
||||
// File events to watcher
|
||||
testLogger.clearOutput();
|
||||
watcherLogger.clearOutput();
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id1)));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id2)));
|
||||
Thread.sleep(10L);
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
|
||||
new CloseContainerStatus(cmdStatus1));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
|
||||
new CloseContainerStatus(cmdStatus2));
|
||||
|
||||
Thread.sleep(1000*3L);
|
||||
// validation
|
||||
assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
|
||||
+ "for containerId: " + id1 + " executed"));
|
||||
assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
|
||||
+ "for containerId: " + id2 + " executed"));
|
||||
assertFalse(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id1));
|
||||
assertFalse(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id2));
|
||||
}
|
||||
|
||||
private void setupWatcher(long time) {
|
||||
leaseManager = new LeaseManager<>("TestCloseContainerWatcher#LeaseManager",
|
||||
time);
|
||||
leaseManager.start();
|
||||
watcher = new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
SCMEvents.CLOSE_CONTAINER_STATUS, leaseManager, containerMapping);
|
||||
queue = new EventQueue();
|
||||
watcher.start(queue);
|
||||
}
|
||||
|
||||
/*
|
||||
* This test will fire two retryable closeContainer events. Both will timeout.
|
||||
* First event container will be open at time of handling so it should be
|
||||
* sent back to appropriate handler. Second event container will be closed,
|
||||
* so it should not be retried.
|
||||
* */
|
||||
@Test
|
||||
public void testWatcherRetryableTimeoutHandling() throws InterruptedException,
|
||||
IOException {
|
||||
|
||||
long id1 = HddsIdFactory.getLongId();
|
||||
long id2 = HddsIdFactory.getLongId();
|
||||
setupWatcher(1000L);
|
||||
queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
|
||||
setupMock(id1, id2, false);
|
||||
GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
|
||||
.captureLogs(LOG);
|
||||
testLogger.clearOutput();
|
||||
|
||||
// File events to watcher
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id1)));
|
||||
queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
|
||||
new CloseContainerRetryableReq(ContainerID.valueof(id2)));
|
||||
|
||||
Thread.sleep(1000L + 10);
|
||||
|
||||
// validation
|
||||
assertTrue(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id1));
|
||||
assertFalse(testLogger.getOutput().contains("Handling "
|
||||
+ "closeContainerEvent for containerId: id=" + id2));
|
||||
}
|
||||
|
||||
|
||||
private void setupMock(long id1, long id2, boolean isOpen)
|
||||
throws IOException {
|
||||
ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
|
||||
ContainerInfo containerInfo2 = Mockito.mock(ContainerInfo.class);
|
||||
when(containerMapping.getContainer(id1)).thenReturn(containerInfo);
|
||||
when(containerMapping.getContainer(id2)).thenReturn(containerInfo2);
|
||||
when(containerInfo.isContainerOpen()).thenReturn(true);
|
||||
when(containerInfo2.isContainerOpen()).thenReturn(isOpen);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ContainerID containerID, EventPublisher publisher) {
|
||||
LOG.info("Handling closeContainerEvent for containerId: {}", containerID);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user