HDDS-161. Add functionality to queue ContainerClose command from SCM Heartbeat Response to Ratis.

Contributed by Shashikant Banerjee.
This commit is contained in:
Anu Engineer 2018-06-13 17:50:42 -07:00
parent 22994889dc
commit 7547740e5c
14 changed files with 412 additions and 14 deletions

View File

@ -403,4 +403,13 @@ private Thread getCommandHandlerThread(Runnable processCommandQueue) {
public long getCommandHandled() {
return commandsHandled;
}
/**
* returns the Command Dispatcher.
* @return CommandDispatcher
*/
@VisibleForTesting
public CommandDispatcher getCommandDispatcher() {
return commandDispatcher;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
@ -29,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
/**
* Handler for close container command received from SCM.
*/
@ -67,8 +71,23 @@ public void handle(SCMCommand command, OzoneContainer container,
CloseContainerCommandProto
.parseFrom(command.getProtoBufMessage());
containerID = closeContainerProto.getContainerID();
HddsProtos.ReplicationType replicationType =
closeContainerProto.getReplicationType();
container.getContainerManager().closeContainer(containerID);
ContainerProtos.CloseContainerRequestProto.Builder closeRequest =
ContainerProtos.CloseContainerRequestProto.newBuilder();
closeRequest.setContainerID(containerID);
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setCloseContainer(closeRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(
context.getParent().getDatanodeDetails().getUuidString());
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType);
} catch (Exception e) {
LOG.error("Can't close container " + containerID, e);

View File

@ -77,6 +77,10 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager
}
}
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
/**
* Dispatch the command to the correct handler.
*

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.shaded.io.netty.channel.Channel;
import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
@ -129,4 +130,10 @@ public void stop() {
channel.close().awaitUninterruptibly();
}
}
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
storageContainer.dispatch(request);
}
}

View File

@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -44,6 +45,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
private int port;
private Server server;
private final ContainerDispatcher storageContainer;
/**
* Constructs a Grpc server class.
@ -77,6 +79,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
.maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher))
.build();
storageContainer = dispatcher;
}
@Override
@ -103,4 +106,10 @@ public void start() throws IOException {
public void stop() {
server.shutdown();
}
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
storageContainer.dispatch(request);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.IOException;
@ -40,4 +41,10 @@ public interface XceiverServerSpi {
*/
HddsProtos.ReplicationType getServerType();
/**
* submits a containerRequest to be performed by the replication pipeline.
* @param request ContainerCommandRequest
*/
void submitRequest(ContainerProtos.ContainerCommandRequestProto request)
throws IOException;
}

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@ -33,10 +35,12 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@ -49,8 +53,10 @@
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Creates a ratis server endpoint that acts as the communication layer for
@ -58,6 +64,12 @@
*/
public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
private static final AtomicLong callIdCounter = new AtomicLong();
private static long nextCallId() {
return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
}
private final int port;
private final RaftServer server;
private ThreadPoolExecutor writeChunkExecutor;
@ -241,4 +253,46 @@ public int getIPCPort() {
public HddsProtos.ReplicationType getServerType() {
return HddsProtos.ReplicationType.RATIS;
}
}
@VisibleForTesting
public RaftServer getServer() {
return server;
}
private void processReply(RaftClientReply reply) {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
// and will eventually be executed once the request comnes via the leader
// node.
NotLeaderException notLeaderException = reply.getNotLeaderException();
if (notLeaderException != null) {
LOG.info(reply.getNotLeaderException().getLocalizedMessage());
}
StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
// In case the request could not be completed, StateMachine Exception
// will be thrown. For now, Just log the message.
// If the container could not be closed, SCM will come to know
// via containerReports. CloseContainer should be re tried via SCM.
LOG.error(stateMachineException.getLocalizedMessage());
}
}
@Override
public void submitRequest(
ContainerProtos.ContainerCommandRequestProto request) throws IOException {
ClientId clientId = ClientId.randomId();
RaftClientRequest raftClientRequest =
new RaftClientRequest(clientId, server.getId(),
RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
Message.valueOf(request.toByteString()), RaftClientRequest
// ReplicationLevel.ALL ensures the transactions corresponding to
// the request here are applied on all the raft servers.
.writeRequestType(RaftProtos.ReplicationLevel.ALL));
CompletableFuture<RaftClientReply> reply =
server.submitClientRequestAsync(raftClientRequest);
reply.thenAccept(this::processReply);
}
}

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@ -72,7 +74,7 @@
* layer.
*/
public class OzoneContainer {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(OzoneContainer.class);
private final Configuration ozoneConfig;
@ -269,9 +271,65 @@ public List<ContainerData> getClosedContainerReports() throws IOException {
return this.manager.getClosedContainerReports();
}
private XceiverServerSpi getRatisSerer() {
for (XceiverServerSpi serverInstance : server) {
if (serverInstance instanceof XceiverServerRatis) {
return serverInstance;
}
}
return null;
}
private XceiverServerSpi getStandaAloneSerer() {
for (XceiverServerSpi serverInstance : server) {
if (!(serverInstance instanceof XceiverServerRatis)) {
return serverInstance;
}
}
return null;
}
@VisibleForTesting
public ContainerManager getContainerManager() {
return this.manager;
}
}
public void submitContainerRequest(
ContainerProtos.ContainerCommandRequestProto request,
HddsProtos.ReplicationType replicationType) throws IOException {
XceiverServerSpi serverInstance;
long containerId = getContainerIdForCmd(request);
if (replicationType == HddsProtos.ReplicationType.RATIS) {
serverInstance = getRatisSerer();
Preconditions.checkNotNull(serverInstance);
serverInstance.submitRequest(request);
LOG.info("submitting {} request over RATIS server for container {}",
request.getCmdType(), containerId);
} else {
serverInstance = getStandaAloneSerer();
Preconditions.checkNotNull(serverInstance);
getStandaAloneSerer().submitRequest(request);
LOG.info(
"submitting {} request over STAND_ALONE server for container {}",
request.getCmdType(), containerId);
}
}
private long getContainerIdForCmd(
ContainerProtos.ContainerCommandRequestProto request)
throws IllegalArgumentException {
ContainerProtos.Type type = request.getCmdType();
switch (type) {
case CloseContainer:
return request.getCloseContainer().getContainerID();
// Right now, we handle only closeContainer via queuing it over the
// over the XceiVerServer. For all other commands we throw Illegal
// argument exception here. Will need to extend the switch cases
// in case we want add another commands here.
default:
throw new IllegalArgumentException("Cmd " + request.getCmdType()
+ " not supported over HearBeat Response");
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
@ -31,9 +32,12 @@ public class CloseContainerCommand
extends SCMCommand<CloseContainerCommandProto> {
private long containerID;
private HddsProtos.ReplicationType replicationType;
public CloseContainerCommand(long containerID) {
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType) {
this.containerID = containerID;
this.replicationType = replicationType;
}
/**
@ -58,13 +62,15 @@ public byte[] getProtoBufMessage() {
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(containerID).build();
.setContainerID(containerID)
.setReplicationType(replicationType).build();
}
public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
return new CloseContainerCommand(closeContainerProto.getContainerID());
return new CloseContainerCommand(closeContainerProto.getContainerID(),
closeContainerProto.getReplicationType());
}

View File

@ -223,6 +223,7 @@ This command asks the datanode to close a specific container.
*/
message CloseContainerCommandProto {
required int64 containerID = 1;
required hadoop.hdds.ReplicationType replicationType = 2;
}
/**

View File

@ -63,7 +63,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
new CloseContainerCommand(containerID.getId()));
new CloseContainerCommand(containerID.getId(),
info.getPipeline().getType()));
}
try {
// Finalize event will make sure the state of the container transitions

View File

@ -127,11 +127,12 @@ public void close(HddsProtos.SCMContainerInfo info) {
// to SCM. In that case also, data node will ignore this command.
HddsProtos.Pipeline pipeline = info.getPipeline();
for (HddsProtos.DatanodeDetailsProto datanodeDetails :
pipeline.getPipelineChannel().getMembersList()) {
for (HddsProtos.DatanodeDetailsProto datanodeDetails : pipeline
.getPipelineChannel().getMembersList()) {
nodeManager.addDatanodeCommand(
DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(),
new CloseContainerCommand(info.getContainerID()));
new CloseContainerCommand(info.getContainerID(),
pipeline.getPipelineChannel().getType()));
}
if (!commandIssued.containsKey(info.getContainerID())) {
commandIssued.put(info.getContainerID(),

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class TestCloseContainerByPipeline {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static OzoneClient client;
private static ObjectStore objectStore;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
objectStore.createVolume("test");
objectStore.getVolume("test").createBucket("test");
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testCloseContainerViaStandaAlone()
throws IOException, TimeoutException, InterruptedException {
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
.createKey("standalone", 1024, ReplicationType.STAND_ALONE,
ReplicationFactor.ONE);
key.write("standalone".getBytes());
key.close();
//get the name of a valid container
KsmKeyArgs keyArgs =
new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
.setKeyName("standalone").build();
KsmKeyLocationInfo ksmKeyLocationInfo =
cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 1);
DatanodeDetails datanodeDetails = datanodes.get(0);
Assert
.assertFalse(isContainerClosed(cluster, containerID, datanodeDetails));
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
//send the order to close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.STAND_ALONE));
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
//double check if it's really closed (waitFor also throws an exception)
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
Assert.assertTrue(logCapturer.getOutput().contains(
"submitting CloseContainer request over STAND_ALONE server for"
+ " container " + containerID));
// Make sure it was really closed via StandAlone not Ratis server
Assert.assertFalse((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
}
@Test
public void testCloseContainerViaRatis() throws IOException,
TimeoutException, InterruptedException {
OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.THREE);
key.write("ratis".getBytes());
key.close();
//get the name of a valid container
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder().setVolumeName("test").
setBucketName("test").setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.THREE).setDataSize(1024)
.setKeyName("ratis").build();
KsmKeyLocationInfo ksmKeyLocationInfo =
cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
.get(0).getBlocksLatestVersionOnly().get(0);
long containerID = ksmKeyLocationInfo.getContainerID();
List<DatanodeDetails> datanodes =
cluster.getStorageContainerManager().getContainerInfo(containerID)
.getPipeline().getMachines();
Assert.assertTrue(datanodes.size() == 3);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG);
for (DatanodeDetails details : datanodes) {
Assert.assertFalse(isContainerClosed(cluster, containerID, details));
//send the order to close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(details.getUuid(),
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.RATIS));
}
for (DatanodeDetails datanodeDetails : datanodes) {
GenericTestUtils.waitFor(
() -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
5 * 1000);
//double check if it's really closed (waitFor also throws an exception)
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
}
Assert.assertFalse(logCapturer.getOutput().contains(
"submitting CloseContainer request over STAND_ALONE "
+ "server for container " + containerID));
// Make sure it was really closed via StandAlone not Ratis server
Assert.assertTrue((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
}
private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,
DatanodeDetails datanode) {
ContainerData containerData;
try {
for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes())
if (datanode.equals(datanodeService.getDatanodeDetails())) {
containerData =
datanodeService.getDatanodeStateMachine().getContainer()
.getContainerManager().readContainer(containerID);
if (!containerData.isOpen()) {
// make sure the closeContainerHandler on the Datanode is invoked
Assert.assertTrue(
datanodeService.getDatanodeStateMachine().getCommandDispatcher()
.getCloseContainerHandler().getInvocationCount() > 0);
return true;
}
}
} catch (StorageContainerException e) {
throw new AssertionError(e);
}
return false;
}
}

View File

@ -83,12 +83,13 @@ public void test() throws IOException, TimeoutException, InterruptedException,
Assert.assertFalse(isContainerClosed(cluster, containerID));
DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
.getDatanodeDetails();
DatanodeDetails datanodeDetails =
cluster.getHddsDatanodes().get(0).getDatanodeDetails();
//send the order to close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(),
new CloseContainerCommand(containerID));
new CloseContainerCommand(containerID,
HddsProtos.ReplicationType.STAND_ALONE));
GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerID),
500,