HDDS-1210. Ratis pipeline creation doesn't check raft client reply status during initialization. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
9d87247af3
commit
2c3ec37738
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdds.scm.pipeline;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
@ -45,6 +46,12 @@ public final class PipelineFactory {
|
||||
new RatisPipelineProvider(nodeManager, stateManager, conf));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setProvider(ReplicationType replicationType,
|
||||
PipelineProvider provider) {
|
||||
providers.put(replicationType, provider);
|
||||
}
|
||||
|
||||
public Pipeline create(ReplicationType type, ReplicationFactor factor)
|
||||
throws IOException {
|
||||
return providers.get(type).create(factor);
|
||||
|
@ -143,7 +143,7 @@ public Pipeline create(ReplicationFactor factor,
|
||||
.build();
|
||||
}
|
||||
|
||||
private void initializePipeline(Pipeline pipeline) throws IOException {
|
||||
protected void initializePipeline(Pipeline pipeline) throws IOException {
|
||||
RatisPipelineUtils.createPipeline(pipeline, conf);
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.ratis.RatisHelper;
|
||||
import org.apache.ratis.client.RaftClient;
|
||||
import org.apache.ratis.grpc.GrpcTlsConfig;
|
||||
import org.apache.ratis.protocol.RaftClientReply;
|
||||
import org.apache.ratis.protocol.RaftGroup;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.protocol.RaftPeer;
|
||||
@ -71,7 +72,15 @@ public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
|
||||
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
|
||||
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
|
||||
callRatisRpc(pipeline.getNodes(), ozoneConf,
|
||||
(raftClient, peer) -> raftClient.groupAdd(group, peer.getId()));
|
||||
(raftClient, peer) -> {
|
||||
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
|
||||
if (reply == null || !reply.isSuccess()) {
|
||||
String msg = "Pipeline initialization failed for pipeline:"
|
||||
+ pipeline.getId() + " node:" + peer.getId();
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -186,8 +195,8 @@ private static void callRatisRpc(List<DatanodeDetails> datanodes,
|
||||
rpc.accept(client, p);
|
||||
} catch (IOException ioe) {
|
||||
exceptions.add(
|
||||
new IOException("Failed invoke Ratis rpc " + rpc + " for " + d,
|
||||
ioe));
|
||||
new IOException("Failed invoke Ratis rpc " + rpc + " for " +
|
||||
d.getUuid(), ioe));
|
||||
}
|
||||
});
|
||||
if (!exceptions.isEmpty()) {
|
||||
|
@ -100,6 +100,15 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
|
||||
initializePipelineState();
|
||||
}
|
||||
|
||||
public PipelineStateManager getStateManager() {
|
||||
return stateManager;
|
||||
}
|
||||
|
||||
public void setPipelineProvider(ReplicationType replicationType,
|
||||
PipelineProvider provider) {
|
||||
pipelineFactory.setProvider(replicationType, provider);
|
||||
}
|
||||
|
||||
private void initializePipelineState() throws IOException {
|
||||
if (pipelineStore.isEmpty()) {
|
||||
LOG.info("No pipeline exists in current db");
|
||||
|
@ -31,7 +31,8 @@
|
||||
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
@ -69,8 +70,13 @@ public void testHealthyPipelineChillModeRuleWithNoPipelines()
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
|
||||
|
||||
|
||||
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
nodeManager, eventQueue);
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), config);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
SCMChillModeManager scmChillModeManager = new SCMChillModeManager(
|
||||
config, containers, pipelineManager, eventQueue);
|
||||
|
||||
@ -109,9 +115,15 @@ public void testHealthyPipelineChillModeRuleWithPipelines() throws Exception {
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
|
||||
|
||||
|
||||
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
nodeManager, eventQueue);
|
||||
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), config);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
// Create 3 pipelines
|
||||
Pipeline pipeline1 =
|
||||
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
|
||||
@ -178,8 +190,13 @@ public void testHealthyPipelineChillModeRuleWithMixedPipelines()
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
|
||||
|
||||
|
||||
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
nodeManager, eventQueue);
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), config);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
// Create 3 pipelines
|
||||
Pipeline pipeline1 =
|
||||
|
@ -27,9 +27,10 @@
|
||||
import org.apache.hadoop.hdds.scm.HddsTestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
@ -51,7 +52,7 @@ public class TestOneReplicaPipelineChillModeRule {
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
private OneReplicaPipelineChillModeRule rule;
|
||||
private PipelineManager pipelineManager;
|
||||
private SCMPipelineManager pipelineManager;
|
||||
private EventQueue eventQueue;
|
||||
|
||||
|
||||
@ -72,6 +73,12 @@ private void setup(int nodes, int pipelineFactorThreeCount,
|
||||
new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
|
||||
eventQueue);
|
||||
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(mockNodeManager,
|
||||
pipelineManager.getStateManager(), ozoneConfiguration);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
createPipelines(pipelineFactorThreeCount,
|
||||
HddsProtos.ReplicationFactor.THREE);
|
||||
createPipelines(pipelineFactorOneCount,
|
||||
|
@ -35,9 +35,10 @@
|
||||
import org.apache.hadoop.hdds.scm.HddsTestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
@ -239,9 +240,15 @@ public void testChillModePipelineExitRule() throws Exception {
|
||||
config.setBoolean(
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
|
||||
|
||||
PipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
|
||||
nodeManager, queue);
|
||||
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), config);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
Pipeline pipeline = pipelineManager.createPipeline(
|
||||
HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE);
|
||||
|
@ -24,7 +24,8 @@
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
@ -49,7 +50,7 @@ public class TestCloseContainerEventHandler {
|
||||
|
||||
private static Configuration configuration;
|
||||
private static MockNodeManager nodeManager;
|
||||
private static PipelineManager pipelineManager;
|
||||
private static SCMPipelineManager pipelineManager;
|
||||
private static SCMContainerManager containerManager;
|
||||
private static long size;
|
||||
private static File testDir;
|
||||
@ -67,6 +68,11 @@ public static void setUp() throws Exception {
|
||||
nodeManager = new MockNodeManager(true, 10);
|
||||
pipelineManager =
|
||||
new SCMPipelineManager(configuration, nodeManager, eventQueue);
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), configuration);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
containerManager = new
|
||||
SCMContainerManager(configuration, nodeManager,
|
||||
pipelineManager, new EventQueue());
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -44,6 +45,9 @@
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||
.NodeReportFromDatanode;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
@ -83,6 +87,13 @@ public void setup() throws IOException, AuthenticationException {
|
||||
eventQueue = new EventQueue();
|
||||
scm = HddsTestUtils.getScm(conf);
|
||||
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
|
||||
SCMPipelineManager manager =
|
||||
(SCMPipelineManager)scm.getPipelineManager();
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
|
||||
conf);
|
||||
manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
containerManager = scm.getContainerManager();
|
||||
deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
|
||||
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
|
||||
|
@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.pipeline;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Mock Ratis Pipeline Provider for Mock Nodes.
|
||||
*/
|
||||
public class MockRatisPipelineProvider extends RatisPipelineProvider {
|
||||
|
||||
public MockRatisPipelineProvider(NodeManager nodeManager,
|
||||
PipelineStateManager stateManager,
|
||||
Configuration conf) {
|
||||
super(nodeManager, stateManager, conf);
|
||||
}
|
||||
|
||||
protected void initializePipeline(Pipeline pipeline) throws IOException {
|
||||
// do nothing as the datanodes do not exists
|
||||
}
|
||||
}
|
@ -46,7 +46,7 @@ public class TestRatisPipelineProvider {
|
||||
public void init() throws Exception {
|
||||
nodeManager = new MockNodeManager(true, 10);
|
||||
stateManager = new PipelineStateManager(new OzoneConfiguration());
|
||||
provider = new RatisPipelineProvider(nodeManager,
|
||||
provider = new MockRatisPipelineProvider(nodeManager,
|
||||
stateManager, new OzoneConfiguration());
|
||||
}
|
||||
|
||||
|
@ -20,12 +20,16 @@
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.ozone.HddsDatanodeService;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@ -81,12 +85,29 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
|
||||
init(3);
|
||||
// make sure a pipelines is created
|
||||
waitForPipelines(1);
|
||||
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
|
||||
List<HddsDatanodeService> dns = new ArrayList<>(cluster.getHddsDatanodes());
|
||||
|
||||
List<Pipeline> pipelines =
|
||||
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE);
|
||||
for (HddsDatanodeService dn : dns) {
|
||||
cluster.shutdownHddsDatanode(dn.getDatanodeDetails());
|
||||
}
|
||||
|
||||
// try creating another pipeline now
|
||||
try {
|
||||
RatisPipelineUtils.createPipeline(pipelines.get(0), conf);
|
||||
Assert.fail("pipeline creation should fail after shutting down pipeline");
|
||||
} catch (IOException ioe) {
|
||||
// in case the pipeline creation fails, MultipleIOException is thrown
|
||||
Assert.assertTrue(ioe instanceof MultipleIOException);
|
||||
}
|
||||
|
||||
// make sure pipelines is destroyed
|
||||
waitForPipelines(0);
|
||||
cluster.startHddsDatanodes();
|
||||
for (HddsDatanodeService dn : dns) {
|
||||
cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
|
||||
}
|
||||
// make sure pipelines is created after node start
|
||||
waitForPipelines(1);
|
||||
}
|
||||
|
@ -71,8 +71,13 @@ public static void cleanup() throws IOException {
|
||||
|
||||
@Test
|
||||
public void testPipelineReload() throws IOException {
|
||||
PipelineManager pipelineManager =
|
||||
SCMPipelineManager pipelineManager =
|
||||
new SCMPipelineManager(conf, nodeManager, new EventQueue());
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), conf);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
Set<Pipeline> pipelines = new HashSet<>();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
Pipeline pipeline = pipelineManager
|
||||
@ -85,6 +90,11 @@ public void testPipelineReload() throws IOException {
|
||||
// new pipeline manager should be able to load the pipelines from the db
|
||||
pipelineManager =
|
||||
new SCMPipelineManager(conf, nodeManager, new EventQueue());
|
||||
mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), conf);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
for (Pipeline p : pipelines) {
|
||||
pipelineManager.openPipeline(p.getId());
|
||||
}
|
||||
@ -102,8 +112,14 @@ public void testPipelineReload() throws IOException {
|
||||
|
||||
@Test
|
||||
public void testRemovePipeline() throws IOException {
|
||||
PipelineManager pipelineManager =
|
||||
SCMPipelineManager pipelineManager =
|
||||
new SCMPipelineManager(conf, nodeManager, new EventQueue());
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), conf);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
Pipeline pipeline = pipelineManager
|
||||
.createPipeline(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE);
|
||||
@ -134,8 +150,14 @@ public void testRemovePipeline() throws IOException {
|
||||
@Test
|
||||
public void testPipelineReport() throws IOException {
|
||||
EventQueue eventQueue = new EventQueue();
|
||||
PipelineManager pipelineManager =
|
||||
SCMPipelineManager pipelineManager =
|
||||
new SCMPipelineManager(conf, nodeManager, eventQueue);
|
||||
PipelineProvider mockRatisProvider =
|
||||
new MockRatisPipelineProvider(nodeManager,
|
||||
pipelineManager.getStateManager(), conf);
|
||||
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
|
||||
mockRatisProvider);
|
||||
|
||||
SCMChillModeManager scmChillModeManager =
|
||||
new SCMChillModeManager(new OzoneConfiguration(),
|
||||
new ArrayList<>(), pipelineManager, eventQueue);
|
||||
|
Loading…
Reference in New Issue
Block a user