diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index a31b77292c..c264650162 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .CloseContainerHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler; @@ -82,6 +84,7 @@ public DatanodeStateMachine(DatanodeID datanodeID, // trick. commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new ContainerReportHandler()) + .addHandler(new CloseContainerHandler()) .addHandler(new DeleteBlocksCommandHandler( container.getContainerManager(), conf)) .setConnectionManager(connectionManager) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java new file mode 100644 index 0000000000..f5fbcd75b4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Container Report handler. + */ +public class CloseContainerHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(CloseContainerHandler.class); + private int invocationCount; + private long totalTime; + + /** + * Constructs a ContainerReport handler. + */ + public CloseContainerHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Close Container command."); + invocationCount++; + long startTime = Time.monotonicNow(); + String containerName = "UNKNOWN"; + try { + + SCMCloseContainerCmdResponseProto + closeContainerProto = + SCMCloseContainerCmdResponseProto + .parseFrom(command.getProtoBufMessage()); + containerName = closeContainerProto.getContainerName(); + + container.getContainerManager().closeContainer(containerName); + + } catch (Exception e) { + LOG.error("Can't close container " + containerName, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public Type getCommandType() { + return Type.closeContainerCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 2177a8215f..b4cce23a62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; @@ -166,6 +167,16 @@ private void processResponse(SCMHeartbeatResponseProto response, this.context.addCommand(db); } break; + case closeContainerCommand: + CloseContainerCommand closeContainer = + CloseContainerCommand.getFromProtobuf( + commandResponseProto.getCloseContainerProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM container close request for container {}", + closeContainer.getContainerName()); + } + this.context.addCommand(closeContainer); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCmdType().name()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java new file mode 100644 index 0000000000..79aa132fe7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type.closeContainerCommand; + +/** + * Asks datanode to close a container. + */ +public class CloseContainerCommand + extends SCMCommand { + + private String containerName; + + public CloseContainerCommand(String containerName) { + this.containerName = containerName; + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public Type getType() { + return closeContainerCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public SCMCloseContainerCmdResponseProto getProto() { + return SCMCloseContainerCmdResponseProto.newBuilder() + .setContainerName(containerName).build(); + } + + public static CloseContainerCommand getFromProtobuf( + SCMCloseContainerCmdResponseProto closeContainerProto) { + Preconditions.checkNotNull(closeContainerProto); + return new CloseContainerCommand(closeContainerProto.getContainerName()); + + } + + public String getContainerName() { + return containerName; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 1fa02458d1..b547a305c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -26,6 +26,7 @@ import com.google.protobuf.BlockingService; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.io.IOUtils; @@ -34,13 +35,15 @@ import org.apache.hadoop.jmx.ServiceRuntimeInfoImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.StorageInfo; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -88,7 +91,6 @@ import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; @@ -102,6 +104,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -112,10 +115,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.Collections; import java.util.stream.Collectors; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; import static org.apache.hadoop.ozone.protocol.proto .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; import static org.apache.hadoop.scm.ScmConfigKeys @@ -580,6 +581,10 @@ public SCMCommandResponseProto getCommandResponse(SCMCommand cmd, return builder.setCmdType(Type.deleteBlocksCommand) .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) .build(); + case closeContainerCommand: + return builder.setCmdType(Type.closeContainerCommand) + .setCloseContainerProto(((CloseContainerCommand)cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Not implemented"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 0fb4bbdfa7..6aa6af7833 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -210,6 +210,13 @@ This command tells the data node to send in the container report when possible message SendContainerReportProto { } +/** +This command asks the datanode to close a specific container. +*/ +message SCMCloseContainerCmdResponseProto { + required string containerName = 1; +} + /** Type of commands supported by SCM to datanode protocol. */ @@ -219,6 +226,7 @@ enum Type { sendContainerReport = 4; reregisterCommand = 5; deleteBlocksCommand = 6; + closeContainerCommand = 7; } /* @@ -232,6 +240,7 @@ message SCMCommandResponseProto { optional SCMReregisterCmdResponseProto reregisterProto = 6; optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7; required string datanodeUUID = 8; + optional SCMCloseContainerCmdResponseProto closeContainerProto = 9; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java new file mode 100644 index 0000000000..1007f3f40d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.ReplicationFactor; +import org.apache.hadoop.ozone.client.ReplicationType; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.client.rest.OzoneException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB; +import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * Test to behaviour of the datanode when recieve close container command. + */ +public class TestCloseContainerHandler { + + @Test + public void test() throws IOException, TimeoutException, InterruptedException, + OzoneException { + + //setup a cluster (1G free space is enough for a unit test) + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1"); + MiniOzoneClassicCluster cluster = + new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + cluster.waitOzoneReady(); + + //the easiest way to create an open container is creating a key + OzoneClientFactory.setConfiguration(conf); + OzoneClient client = OzoneClientFactory.getClient(); + ObjectStore objectStore = client.getObjectStore(); + objectStore.createVolume("test"); + objectStore.getVolume("test").createBucket("test"); + OzoneOutputStream key = objectStore.getVolume("test").getBucket("test") + .createKey("test", 1024, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE); + key.write("test".getBytes()); + key.close(); + + //get the name of a valid container + KsmKeyArgs keyArgs = + new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test") + .setType(OzoneProtos.ReplicationType.STAND_ALONE) + .setFactor(OzoneProtos.ReplicationFactor.ONE).setDataSize(1024) + .setKeyName("test").build(); + + KsmKeyLocationInfo ksmKeyLocationInfo = + cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions() + .get(0).getBlocksLatestVersionOnly().get(0); + + String containerName = ksmKeyLocationInfo.getContainerName(); + + Assert.assertFalse(isContainerClosed(cluster, containerName)); + + //send the order to close the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(cluster.getDataNodes().get(0).getDatanodeId(), + new CloseContainerCommand(containerName)); + + GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerName), + 500, + 5 * 1000); + + //double check if it's really closed (waitFor also throws an exception) + Assert.assertTrue(isContainerClosed(cluster, containerName)); + } + + private Boolean isContainerClosed(MiniOzoneClassicCluster cluster, + String containerName) { + ContainerData containerData; + try { + containerData = cluster.getDataNodes().get(0).getOzoneContainerManager() + .getContainerManager().readContainer(containerName); + return !containerData.isOpen(); + } catch (StorageContainerException e) { + throw new AssertionError(e); + } + } + +} \ No newline at end of file