HDDS-1550. MiniOzoneCluster is not shutting down all the threads during shutdown. Contributed by Mukul Kumar Singh. (#1050)
* HDDS-1550. MiniOzoneCluster is not shutting down all the threads during shutdown. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
b5d30e4914
commit
e5ffb88257
@ -120,6 +120,11 @@ public abstract Container importContainer(
|
|||||||
TarContainerPacker packer)
|
TarContainerPacker packer)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the Handler.
|
||||||
|
*/
|
||||||
|
public abstract void stop();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks the container for closing. Moves the container to CLOSING state.
|
* Marks the container for closing. Moves the container to CLOSING state.
|
||||||
*
|
*
|
||||||
|
@ -23,10 +23,13 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ReportManager is responsible for managing all the {@link ReportPublisher}
|
* ReportManager is responsible for managing all the {@link ReportPublisher}
|
||||||
@ -34,6 +37,8 @@
|
|||||||
* which should be used for scheduling the reports.
|
* which should be used for scheduling the reports.
|
||||||
*/
|
*/
|
||||||
public final class ReportManager {
|
public final class ReportManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ReportManager.class);
|
||||||
|
|
||||||
private final StateContext context;
|
private final StateContext context;
|
||||||
private final List<ReportPublisher> publishers;
|
private final List<ReportPublisher> publishers;
|
||||||
@ -71,6 +76,11 @@ public void init() {
|
|||||||
*/
|
*/
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
|
try {
|
||||||
|
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to shutdown Report Manager", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,6 +57,7 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a Grpc server endpoint that acts as the communication layer for
|
* Creates a Grpc server endpoint that acts as the communication layer for
|
||||||
@ -172,6 +173,11 @@ public void start() throws IOException {
|
|||||||
public void stop() {
|
public void stop() {
|
||||||
if (isStarted) {
|
if (isStarted) {
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
|
try {
|
||||||
|
server.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("failed to shutdown XceiverServerGrpc", e);
|
||||||
|
}
|
||||||
isStarted = false;
|
isStarted = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,6 +158,11 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
|
|||||||
return volumeChoosingPolicy;
|
return volumeChoosingPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
blockDeletingService.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerCommandResponseProto handle(
|
public ContainerCommandResponseProto handle(
|
||||||
ContainerCommandRequestProto request, Container container,
|
ContainerCommandRequestProto request, Container container,
|
||||||
|
@ -200,6 +200,7 @@ public void stop() {
|
|||||||
stopContainerScrub();
|
stopContainerScrub();
|
||||||
writeChannel.stop();
|
writeChannel.stop();
|
||||||
readChannel.stop();
|
readChannel.stop();
|
||||||
|
this.handlers.values().forEach(Handler::stop);
|
||||||
hddsDispatcher.shutdown();
|
hddsDispatcher.shutdown();
|
||||||
volumeSet.shutdown();
|
volumeSet.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -207,6 +207,7 @@ public NodeState getNodeState(DatanodeDetails datanodeDetails) {
|
|||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
unregisterMXBean();
|
unregisterMXBean();
|
||||||
metrics.unRegister();
|
metrics.unRegister();
|
||||||
|
nodeStateManager.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,6 +84,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
@ -189,6 +190,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||||||
private final OzoneConfiguration configuration;
|
private final OzoneConfiguration configuration;
|
||||||
private final SafeModeHandler safeModeHandler;
|
private final SafeModeHandler safeModeHandler;
|
||||||
private SCMContainerMetrics scmContainerMetrics;
|
private SCMContainerMetrics scmContainerMetrics;
|
||||||
|
private MetricsSystem ms;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Network topology Map.
|
* Network topology Map.
|
||||||
@ -754,7 +756,7 @@ public void start() throws IOException {
|
|||||||
buildRpcServerStartMessage(
|
buildRpcServerStartMessage(
|
||||||
"StorageContainerLocationProtocol RPC server",
|
"StorageContainerLocationProtocol RPC server",
|
||||||
getClientRpcAddress()));
|
getClientRpcAddress()));
|
||||||
DefaultMetricsSystem.initialize("StorageContainerManager");
|
ms = DefaultMetricsSystem.initialize("StorageContainerManager");
|
||||||
|
|
||||||
commandWatcherLeaseManager.start();
|
commandWatcherLeaseManager.start();
|
||||||
getClientProtocolServer().start();
|
getClientProtocolServer().start();
|
||||||
@ -874,6 +876,10 @@ public void stop() {
|
|||||||
LOG.error("SCM Metadata store stop failed", ex);
|
LOG.error("SCM Metadata store stop failed", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ms != null) {
|
||||||
|
ms.stop();
|
||||||
|
}
|
||||||
|
|
||||||
scmSafeModeManager.stop();
|
scmSafeModeManager.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +244,7 @@ void initializeConfiguration() throws IOException {
|
|||||||
1, TimeUnit.SECONDS);
|
1, TimeUnit.SECONDS);
|
||||||
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1,
|
conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 8);
|
conf.setInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE, 2);
|
||||||
conf.setInt("hdds.scm.replication.thread.interval", 10 * 1000);
|
conf.setInt("hdds.scm.replication.thread.interval", 10 * 1000);
|
||||||
conf.setInt("hdds.scm.replication.event.timeout", 20 * 1000);
|
conf.setInt("hdds.scm.replication.event.timeout", 20 * 1000);
|
||||||
}
|
}
|
||||||
|
@ -340,19 +340,20 @@ public void stop() {
|
|||||||
ozoneManager.join();
|
ozoneManager.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!hddsDatanodes.isEmpty()) {
|
||||||
|
LOG.info("Shutting the HddsDatanodes");
|
||||||
|
hddsDatanodes.parallelStream()
|
||||||
|
.forEach(dn -> {
|
||||||
|
dn.stop();
|
||||||
|
dn.join();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (scm != null) {
|
if (scm != null) {
|
||||||
LOG.info("Stopping the StorageContainerManager");
|
LOG.info("Stopping the StorageContainerManager");
|
||||||
scm.stop();
|
scm.stop();
|
||||||
scm.join();
|
scm.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hddsDatanodes.isEmpty()) {
|
|
||||||
LOG.info("Shutting the HddsDatanodes");
|
|
||||||
for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
|
|
||||||
hddsDatanode.stop();
|
|
||||||
hddsDatanode.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -568,6 +569,8 @@ private void configureSCM() {
|
|||||||
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
|
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
|
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
|
||||||
conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
|
conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
|
||||||
|
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
|
||||||
|
"3s");
|
||||||
configureSCMheartbeat();
|
configureSCMheartbeat();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,10 +20,8 @@
|
|||||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
||||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doNothing;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
@ -65,7 +63,6 @@
|
|||||||
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
|
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
|
||||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||||
import org.apache.hadoop.hdds.server.events.TypedEvent;
|
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||||
@ -520,6 +517,11 @@ public void testCloseContainerCommandOnRestart() throws Exception {
|
|||||||
|
|
||||||
// Stop processing HB
|
// Stop processing HB
|
||||||
scm.getDatanodeProtocolServer().stop();
|
scm.getDatanodeProtocolServer().stop();
|
||||||
|
|
||||||
|
scm.getContainerManager().updateContainerState(selectedContainer
|
||||||
|
.containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
|
||||||
|
cluster.restartStorageContainerManager(true);
|
||||||
|
scm = cluster.getStorageContainerManager();
|
||||||
EventPublisher publisher = mock(EventPublisher.class);
|
EventPublisher publisher = mock(EventPublisher.class);
|
||||||
ReplicationManager replicationManager = scm.getReplicationManager();
|
ReplicationManager replicationManager = scm.getReplicationManager();
|
||||||
Field f = replicationManager.getClass().getDeclaredField("eventPublisher");
|
Field f = replicationManager.getClass().getDeclaredField("eventPublisher");
|
||||||
@ -528,13 +530,6 @@ public void testCloseContainerCommandOnRestart() throws Exception {
|
|||||||
modifiersField.setAccessible(true);
|
modifiersField.setAccessible(true);
|
||||||
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
|
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
|
||||||
f.set(replicationManager, publisher);
|
f.set(replicationManager, publisher);
|
||||||
|
|
||||||
doNothing().when(publisher).fireEvent(any(TypedEvent.class),
|
|
||||||
any(CommandForDatanode.class));
|
|
||||||
|
|
||||||
scm.getContainerManager().updateContainerState(selectedContainer
|
|
||||||
.containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
|
|
||||||
cluster.restartStorageContainerManager(true);
|
|
||||||
scm.getReplicationManager().start();
|
scm.getReplicationManager().start();
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
@ -572,7 +567,7 @@ public boolean matches(Object argument) {
|
|||||||
(CloseContainerCommand) cmdRight.getCommand();
|
(CloseContainerCommand) cmdRight.getCommand();
|
||||||
return cmdRight.getDatanodeId().equals(uuid)
|
return cmdRight.getDatanodeId().equals(uuid)
|
||||||
&& left.getContainerID() == right.getContainerID()
|
&& left.getContainerID() == right.getContainerID()
|
||||||
&& left.getPipelineID() == right.getPipelineID()
|
&& left.getPipelineID().equals(right.getPipelineID())
|
||||||
&& left.getType() == right.getType()
|
&& left.getType() == right.getType()
|
||||||
&& left.getProto().equals(right.getProto());
|
&& left.getProto().equals(right.getProto());
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,17 @@
|
|||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
|
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||||
|
.HDDS_HEARTBEAT_INTERVAL;
|
||||||
|
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||||
|
.HDDS_PIPELINE_REPORT_INTERVAL;
|
||||||
|
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||||
|
.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
||||||
|
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||||
|
.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||||
|
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||||
|
.HDDS_NODE_REPORT_INTERVAL;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
|
||||||
@ -61,6 +71,10 @@ public void setUp() throws Exception {
|
|||||||
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
|
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
|
||||||
interval, TimeUnit.MILLISECONDS);
|
interval, TimeUnit.MILLISECONDS);
|
||||||
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
|
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
|
||||||
|
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS);
|
||||||
|
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 1, SECONDS);
|
||||||
|
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS);
|
||||||
|
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
|
||||||
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
|
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
|
||||||
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
|
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user