From c4d96400280d9a1595b82435297dbbed2668c485 Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Fri, 16 Nov 2018 23:07:45 +0530 Subject: [PATCH] HDDS-801. Quasi close the container when close is not executed via Ratis. Contributed by Nanda kumar. --- .../proto/DatanodeContainerProtocol.proto | 4 +- .../container/common/impl/ContainerData.java | 17 +- .../container/common/impl/HddsDispatcher.java | 14 +- .../common/interfaces/Container.java | 15 +- .../container/common/interfaces/Handler.java | 68 +++++- .../report/ContainerReportPublisher.java | 3 +- .../statemachine/DatanodeStateMachine.java | 2 +- .../common/statemachine/StateContext.java | 20 -- .../CloseContainerCommandHandler.java | 136 ++++++------ .../states/endpoint/RegisterEndpointTask.java | 2 +- .../transport/server/XceiverServerGrpc.java | 5 + .../transport/server/XceiverServerSpi.java | 7 + .../server/ratis/XceiverServerRatis.java | 18 +- .../container/keyvalue/KeyValueContainer.java | 43 +++- .../container/keyvalue/KeyValueHandler.java | 121 ++++++---- .../ozoneimpl/ContainerController.java | 124 +++++++++++ .../container/ozoneimpl/OzoneContainer.java | 124 ++++------- .../DownloadAndImportReplicator.java | 16 +- .../OnDemandContainerReplicationSource.java | 10 +- .../commands/CloseContainerCommand.java | 12 +- .../StorageContainerDatanodeProtocol.proto | 8 +- .../common/impl/TestHddsDispatcher.java | 26 ++- .../common/interfaces/TestHandler.java | 16 +- .../TestCloseContainerCommandHandler.java | 210 ++++++++++++++++++ .../keyvalue/TestKeyValueHandler.java | 8 +- .../hdds/scm/block/BlockManagerImpl.java | 2 +- .../container/CloseContainerEventHandler.java | 4 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 3 +- .../ozone/container/common/TestEndPoint.java | 8 +- .../dist/src/main/smoketest/basic/basic.robot | 2 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 18 +- .../TestCloseContainerHandlingByClient.java | 26 +-- .../TestCloseContainerByPipeline.java | 80 +++---- .../TestCloseContainerHandler.java | 3 +- .../metrics/TestContainerMetrics.java | 22 +- .../container/server/TestContainerServer.java | 26 ++- .../genesis/BenchMarkDatanodeDispatcher.java | 19 +- .../TestFreonWithDatanodeFastRestart.java | 3 +- 38 files changed, 876 insertions(+), 369 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 5eecdcbf02..ac2d27740e 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -229,8 +229,8 @@ message ContainerDataProto { enum State { OPEN = 1; CLOSING = 2; - CLOSED = 3; - QUASI_CLOSED = 4; + QUASI_CLOSED =3; + CLOSED = 4; UNHEALTHY = 5; INVALID = 6; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index ad199f0e27..6f515176ba 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -237,10 +237,25 @@ public synchronized boolean isValid() { * checks if the container is closed. * @return - boolean */ - public synchronized boolean isClosed() { + public synchronized boolean isClosed() { return ContainerDataProto.State.CLOSED == state; } + /** + * checks if the container is quasi closed. + * @return - boolean + */ + public synchronized boolean isQuasiClosed() { + return ContainerDataProto.State.QUASI_CLOSED == state; + } + + /** + * Marks this container as quasi closed. + */ + public synchronized void quasiCloseContainer() { + setState(ContainerDataProto.State.QUASI_CLOSED); + } + /** * Marks this container as closed. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index c52d97374a..29f9b20a83 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -76,18 +75,14 @@ public class HddsDispatcher implements ContainerDispatcher { * XceiverServerHandler. */ public HddsDispatcher(Configuration config, ContainerSet contSet, - VolumeSet volumes, StateContext context) { + VolumeSet volumes, Map handlers, + StateContext context, ContainerMetrics metrics) { this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; this.context = context; - this.handlers = Maps.newHashMap(); - this.metrics = ContainerMetrics.create(conf); - for (ContainerType containerType : ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType( - containerType, conf, containerSet, volumeSet, metrics)); - } + this.handlers = handlers; + this.metrics = metrics; this.containerCloseThreshold = conf.getFloat( HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD, HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -347,6 +342,7 @@ public void setScmId(String scmId) { } } + @VisibleForTesting public Container getContainer(long containerID) { return containerSet.getContainer(containerID); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 65147cc4f6..2bda74707d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -82,11 +82,24 @@ void update(Map metaData, boolean forceUpdate) ContainerProtos.ContainerDataProto.State getContainerState(); /** - * Closes a open container, if it is already closed or does not exist a + * Marks the container for closing. Moves the container to CLOSING state. + */ + void markContainerForClose() throws StorageContainerException; + + /** + * Quasi Closes a open container, if it is already closed or does not exist a * StorageContainerException is thrown. * * @throws StorageContainerException */ + void quasiClose() throws StorageContainerException; + + /** + * Closes a open/quasi closed container, if it is already closed or does not + * exist a StorageContainerException is thrown. + * + * @throws StorageContainerException + */ void close() throws StorageContainerException; /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 53e1c68a4e..19e61c407d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -29,8 +29,12 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; @@ -47,26 +51,47 @@ public abstract class Handler { protected String scmID; protected final ContainerMetrics metrics; - protected Handler(Configuration config, ContainerSet contSet, - VolumeSet volumeSet, ContainerMetrics containerMetrics) { - conf = config; - containerSet = contSet; + private final StateContext context; + + protected Handler(Configuration config, StateContext context, + ContainerSet contSet, VolumeSet volumeSet, + ContainerMetrics containerMetrics) { + this.conf = config; + this.context = context; + this.containerSet = contSet; this.volumeSet = volumeSet; this.metrics = containerMetrics; } - public static Handler getHandlerForContainerType(ContainerType containerType, - Configuration config, ContainerSet contSet, VolumeSet volumeSet, - ContainerMetrics metrics) { + public static Handler getHandlerForContainerType( + final ContainerType containerType, final Configuration config, + final StateContext context, final ContainerSet contSet, + final VolumeSet volumeSet, final ContainerMetrics metrics) { switch (containerType) { case KeyValueContainer: - return new KeyValueHandler(config, contSet, volumeSet, metrics); + return new KeyValueHandler(config, context, contSet, volumeSet, metrics); default: throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist."); } } + /** + * This should be called whenever there is state change. It will trigger + * an ICR to SCM. + * + * @param container Container for which ICR has to be sent + */ + protected void sendICR(final Container container) + throws StorageContainerException { + IncrementalContainerReportProto icr = IncrementalContainerReportProto + .newBuilder() + .addReport(container.getContainerReport()) + .build(); + context.addReport(icr); + context.getParent().triggerHeartbeat(); + } + public abstract ContainerCommandResponseProto handle( ContainerCommandRequestProto msg, Container container); @@ -80,6 +105,33 @@ public abstract Container importContainer( TarContainerPacker packer) throws IOException; + /** + * Marks the container for closing. Moves the container to CLOSING state. + * + * @param container container to update + * @throws IOException in case of exception + */ + public abstract void markContainerForClose(Container container) + throws IOException; + + /** + * Moves the Container to QUASI_CLOSED state. + * + * @param container container to be quasi closed + * @throws IOException + */ + public abstract void quasiCloseContainer(Container container) + throws IOException; + + /** + * Moves the Container to CLOSED state. + * + * @param container container to be closed + * @throws IOException + */ + public abstract void closeContainer(Container container) + throws IOException; + public void setScmID(String scmId) { this.scmID = scmId; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java index ccb9a9aada..b92e3b0e1f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java @@ -80,6 +80,7 @@ private long getRandomReportDelay() { @Override protected ContainerReportsProto getReport() throws IOException { - return getContext().getParent().getContainer().getContainerReport(); + return getContext().getParent().getContainer() + .getController().getContainerReport(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 12c33ff947..1d87071ba9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -99,7 +99,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, ContainerReplicator replicator = new DownloadAndImportReplicator(container.getContainerSet(), - container.getDispatcher(), + container.getController(), new SimpleContainerDownloader(conf), new TarContainerPacker()); supervisor = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e9288241c8..953f73035a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -59,8 +59,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - /** * Current Context of State Machine. */ @@ -115,24 +113,6 @@ public DatanodeStateMachine getParent() { return parent; } - /** - * Get the container server port. - * @return The container server port if available, return -1 if otherwise - */ - public int getContainerPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getContainerServerPort(); - } - - /** - * Gets the Ratis Port. - * @return int , return -1 if not valid. - */ - public int getRatisPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); - } - /** * Returns true if we are entering a new state. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 0838be2096..9d258128da 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -16,21 +16,20 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerDataProto.State; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -38,17 +37,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.UUID; /** * Handler for close container command received from SCM. */ public class CloseContainerCommandHandler implements CommandHandler { - static final Logger LOG = + + private static final Logger LOG = LoggerFactory.getLogger(CloseContainerCommandHandler.class); + private int invocationCount; private long totalTime; - private boolean cmdExecuted; /** * Constructs a ContainerReport handler. @@ -67,79 +68,70 @@ public CloseContainerCommandHandler() { @Override public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - long startTime = Time.monotonicNow(); - // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) - long containerID = -1; try { - CloseContainerCommandProto closeContainerProto = + LOG.debug("Processing Close Container command."); + invocationCount++; + final long startTime = Time.monotonicNow(); + final DatanodeDetails datanodeDetails = context.getParent() + .getDatanodeDetails(); + final CloseContainerCommandProto closeCommand = CloseContainerCommandProto.parseFrom(command.getProtoBufMessage()); - containerID = closeContainerProto.getContainerID(); - // CloseContainer operation is idempotent, if the container is already - // closed, then do nothing. - // TODO: Non-existent container should be handled properly - Container container = - ozoneContainer.getContainerSet().getContainer(containerID); - if (container == null) { - LOG.error("Container {} does not exist in datanode. " - + "Container close failed.", containerID); - cmdExecuted = false; - return; - } - ContainerData containerData = container.getContainerData(); - State containerState = container.getContainerData().getState(); - if (containerState != State.CLOSED) { - LOG.debug("Closing container {}.", containerID); - // when a closeContainerCommand arrives at a Datanode and if the - // container is open, each replica will be moved to closing state first. - if (containerState == State.OPEN) { - containerData.setState(State.CLOSING); + final ContainerController controller = ozoneContainer.getController(); + final long containerId = closeCommand.getContainerID(); + try { + // TODO: Closing of QUASI_CLOSED container. + + final Container container = controller.getContainer(containerId); + + if (container == null) { + LOG.error("Container #{} does not exist in datanode. " + + "Container close failed.", containerId); + return; } - // if the container is already closed, it will be just ignored. - // ICR will get triggered to change the replica state in SCM. - HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); - HddsProtos.ReplicationType replicationType = - closeContainerProto.getReplicationType(); + // Move the container to CLOSING state + controller.markContainerForClose(containerId); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(containerID); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid( - context.getParent().getDatanodeDetails().getUuidString()); - // submit the close container request for the XceiverServer to handle - ozoneContainer.submitContainerRequest(request.build(), replicationType, - pipelineID); - // Since the container is closed, we trigger an ICR - IncrementalContainerReportProto icr = - IncrementalContainerReportProto.newBuilder().addReport( - ozoneContainer.getContainerSet().getContainer(containerID) - .getContainerReport()).build(); - context.addReport(icr); - context.getParent().triggerHeartbeat(); + // If the container is part of open pipeline, close it via write channel + if (ozoneContainer.getWriteChannel() + .isExist(closeCommand.getPipelineID())) { + ContainerCommandRequestProto request = + getContainerCommandRequestProto(datanodeDetails, + closeCommand.getContainerID()); + ozoneContainer.getWriteChannel().submitRequest( + request, closeCommand.getPipelineID()); + return; + } + + // The container is not part of any open pipeline. + // QUASI_CLOSE the container using ContainerController. + controller.quasiCloseContainer(containerId); + } catch (NotLeaderException e) { + LOG.debug("Follower cannot close container #{}.", containerId); + } catch (IOException e) { + LOG.error("Can't close container #{}", containerId, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; } - } catch (Exception e) { - if (e instanceof NotLeaderException) { - // If the particular datanode is not the Ratis leader, the close - // container command will not be executed by the follower but will be - // executed by Ratis stateMachine transactions via leader to follower. - // There can also be case where the datanode is in candidate state. - // In these situations, NotLeaderException is thrown. - LOG.info("Follower cannot close the container {}.", containerID); - } else { - LOG.error("Can't close container " + containerID, e); - } - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + } catch (InvalidProtocolBufferException ex) { + LOG.error("Exception while closing container", ex); } } + private ContainerCommandRequestProto getContainerCommandRequestProto( + final DatanodeDetails datanodeDetails, final long containerId) { + final ContainerCommandRequestProto.Builder command = + ContainerCommandRequestProto.newBuilder(); + command.setCmdType(ContainerProtos.Type.CloseContainer); + command.setContainerID(containerId); + command.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + command.setTraceID(UUID.randomUUID().toString()); + command.setDatanodeUuid(datanodeDetails.getUuidString()); + return command.build(); + } + /** * Returns the command type that this command handler handles. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index acd4af978e..9918f9dfda 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -111,7 +111,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception { try { ContainerReportsProto containerReport = datanodeContainerManager - .getContainerReport(); + .getController().getContainerReport(); NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); PipelineReportsProto pipelineReportsProto = datanodeContainerManager.getPipelineReport(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index ab9f42fc13..992f3cb553 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -140,6 +140,11 @@ public void submitRequest(ContainerCommandRequestProto request, } } + @Override + public boolean isExist(HddsProtos.PipelineID pipelineId) { + return PipelineID.valueOf(id).getProtobuf().equals(pipelineId); + } + @Override public List getPipelineReport() { return Collections.singletonList( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index 8c3fa5c8dd..4e0d34384c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -53,6 +53,13 @@ void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException; + /** + * Returns true if the given pipeline exist. + * + * @return true if pipeline present, else false + */ + boolean isExist(HddsProtos.PipelineID pipelineId); + /** * Get pipeline report for the XceiverServer instance. * @return list of report for each pipeline. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7bf4da9b69..434d330ff6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -86,7 +86,8 @@ * Ozone containers. */ public final class XceiverServerRatis implements XceiverServerSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private static final Logger LOG = LoggerFactory + .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); private static long nextCallId() { @@ -457,6 +458,21 @@ private void handlePipelineFailure(RaftGroupId groupId, + ".Reason : " + action.getClosePipeline().getDetailedReason()); } + @Override + public boolean isExist(HddsProtos.PipelineID pipelineId) { + try { + for (RaftGroupId groupId : server.getGroupIds()) { + if (PipelineID.valueOf( + groupId.getUuid()).getProtobuf().equals(pipelineId)) { + return true; + } + } + return false; + } catch (IOException e) { + return false; + } + } + @Override public List getPipelineReport() { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index b82c12f0ec..f725d22092 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; import org.apache.hadoop.hdds.protocol.proto @@ -267,29 +269,47 @@ public void delete(boolean forceDelete) } @Override - public void close() throws StorageContainerException { + public void markContainerForClose() throws StorageContainerException { + updateContainerData(() -> + containerData.setState(ContainerDataProto.State.CLOSING)); + } - //TODO: writing .container file and compaction can be done - // asynchronously, otherwise rpc call for this will take a lot of time to - // complete this action + @Override + public void quasiClose() throws StorageContainerException { + updateContainerData(containerData::quasiCloseContainer); + } + + @Override + public void close() throws StorageContainerException { + updateContainerData(containerData::closeContainer); + // It is ok if this operation takes a bit of time. + // Close container is not expected to be instantaneous. + compactDB(); + } + + private void updateContainerData(Runnable update) + throws StorageContainerException { + ContainerDataProto.State oldState = null; try { writeLock(); - - containerData.closeContainer(); + oldState = containerData.getState(); + update.run(); File containerFile = getContainerFile(); // update the new container data to .container File updateContainerFile(containerFile); } catch (StorageContainerException ex) { - // Failed to update .container file. Reset the state to CLOSING - containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSING); + if (oldState != null) { + // Failed to update .container file. Reset the state to CLOSING + containerData.setState(oldState); + } throw ex; } finally { writeUnlock(); } + } - // It is ok if this operation takes a bit of time. - // Close container is not expected to be instantaneous. + private void compactDB() throws StorageContainerException { try { MetadataStore db = BlockUtils.getDB(containerData, config); db.compactDB(); @@ -549,6 +569,9 @@ private ContainerReplicaProto.State getHddsState() case CLOSING: state = ContainerReplicaProto.State.CLOSING; break; + case QUASI_CLOSED: + state = ContainerReplicaProto.State.QUASI_CLOSED; + break; case CLOSED: state = ContainerReplicaProto.State.CLOSED; break; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index f970c72ee5..4a5ef18259 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerDataProto; + .ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; @@ -109,9 +110,9 @@ public class KeyValueHandler extends Handler { private final long maxContainerSize; private final AutoCloseableLock handlerLock; - public KeyValueHandler(Configuration config, ContainerSet contSet, - VolumeSet volSet, ContainerMetrics metrics) { - super(config, contSet, volSet, metrics); + public KeyValueHandler(Configuration config, StateContext context, + ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { + super(config, context, contSet, volSet, metrics); containerType = ContainerType.KeyValueContainer; blockManager = new BlockManagerImpl(config); chunkManager = new ChunkManagerImpl(); @@ -372,20 +373,10 @@ ContainerCommandResponseProto handleCloseContainer( request.getTraceID()); return ContainerUtils.malformedRequest(request); } - - long containerID = kvContainer.getContainerData().getContainerID(); try { - checkContainerOpen(kvContainer); - // TODO : The close command should move the container to either quasi - // closed/closed depending upon how the closeContainer gets executed. - // If it arrives by Standalone, it will be moved to Quasi Closed or - // otherwise moved to Closed state if it gets executed via Ratis. - kvContainer.close(); + markContainerForClose(kvContainer); + closeContainer(kvContainer); } catch (StorageContainerException ex) { - if (ex.getResult() == CLOSED_CONTAINER_IO) { - LOG.debug("Container {} is already closed.", containerID); - return ContainerUtils.getSuccessResponse(request); - } return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { return ContainerUtils.logAndReturnError(LOG, @@ -745,38 +736,39 @@ ContainerCommandResponseProto handleUnsupportedOp( private void checkContainerOpen(KeyValueContainer kvContainer) throws StorageContainerException { - ContainerDataProto.State containerState = kvContainer.getContainerState(); + final State containerState = kvContainer.getContainerState(); - /** + /* * In a closing state, follower will receive transactions from leader. * Once the leader is put to closing state, it will reject further requests * from clients. Only the transactions which happened before the container * in the leader goes to closing state, will arrive here even the container * might already be in closing state here. */ - if (containerState == ContainerDataProto.State.OPEN - || containerState == ContainerDataProto.State.CLOSING) { + if (containerState == State.OPEN || containerState == State.CLOSING) { return; - } else { - String msg = "Requested operation not allowed as ContainerState is " + - containerState; - ContainerProtos.Result result = null; - switch (containerState) { - case CLOSED: - result = CLOSED_CONTAINER_IO; - break; - case UNHEALTHY: - result = CONTAINER_UNHEALTHY; - break; - case INVALID: - result = INVALID_CONTAINER_STATE; - break; - default: - result = CONTAINER_INTERNAL_ERROR; - } - - throw new StorageContainerException(msg, result); } + + final ContainerProtos.Result result; + switch (containerState) { + case QUASI_CLOSED: + result = CLOSED_CONTAINER_IO; + break; + case CLOSED: + result = CLOSED_CONTAINER_IO; + break; + case UNHEALTHY: + result = CONTAINER_UNHEALTHY; + break; + case INVALID: + result = INVALID_CONTAINER_STATE; + break; + default: + result = CONTAINER_INTERNAL_ERROR; + } + String msg = "Requested operation not allowed as ContainerState is " + + containerState; + throw new StorageContainerException(msg, result); } public Container importContainer(long containerID, long maxSize, @@ -796,4 +788,55 @@ public Container importContainer(long containerID, long maxSize, return container; } + + @Override + public void markContainerForClose(Container container) + throws IOException { + State currentState = container.getContainerState(); + // Move the container to CLOSING state only if it's OPEN + if (currentState == State.OPEN) { + container.markContainerForClose(); + sendICR(container); + } + } + + @Override + public void quasiCloseContainer(Container container) + throws IOException { + final State state = container.getContainerState(); + // Quasi close call is idempotent. + if (state == State.QUASI_CLOSED) { + return; + } + // The container has to be in CLOSING state. + if (state != State.CLOSING) { + ContainerProtos.Result error = state == State.INVALID ? + INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR; + throw new StorageContainerException("Cannot quasi close container #" + + container.getContainerData().getContainerID() + " while in " + + state + " state.", error); + } + container.quasiClose(); + sendICR(container); + } + + @Override + public void closeContainer(Container container) + throws IOException { + final State state = container.getContainerState(); + // Close call is idempotent. + if (state == State.CLOSED) { + return; + } + // The container has to be either in CLOSING or in QUASI_CLOSED state. + if (state != State.CLOSING && state != State.QUASI_CLOSED) { + ContainerProtos.Result error = state == State.INVALID ? + INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR; + throw new StorageContainerException("Cannot close container #" + + container.getContainerData().getContainerID() + " while in " + + state + " state.", error); + } + container.close(); + sendICR(container); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java new file mode 100644 index 0000000000..41b74e7b1b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerDataProto.State; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Map; + +/** + * Control plane for container management in datanode. + */ +public class ContainerController { + + private final ContainerSet containerSet; + private final Map handlers; + + public ContainerController(final ContainerSet containerSet, + final Map handlers) { + this.containerSet = containerSet; + this.handlers = handlers; + } + + /** + * Returns the Container given a container id. + * + * @param containerId ID of the container + * @return Container + */ + public Container getContainer(final long containerId) { + return containerSet.getContainer(containerId); + } + + /** + * Marks the container for closing. Moves the container to CLOSING state. + * + * @param containerId Id of the container to update + * @throws IOException in case of exception + */ + public void markContainerForClose(final long containerId) + throws IOException { + Container container = containerSet.getContainer(containerId); + + if (container.getContainerState() == State.OPEN) { + getHandler(container).markContainerForClose(container); + } + } + + /** + * Returns the container report. + * + * @return ContainerReportsProto + * @throws IOException in case of exception + */ + public ContainerReportsProto getContainerReport() + throws IOException { + return containerSet.getContainerReport(); + } + + /** + * Quasi closes a container given its id. + * + * @param containerId Id of the container to quasi close + * @throws IOException in case of exception + */ + public void quasiCloseContainer(final long containerId) throws IOException { + final Container container = containerSet.getContainer(containerId); + getHandler(container).quasiCloseContainer(container); + } + + /** + * Closes a container given its id. + * + * @param containerId Id of the container to close + * @throws IOException in case of exception + */ + public void closeContainer(final long containerId) throws IOException { + final Container container = containerSet.getContainer(containerId); + getHandler(container).closeContainer(container); + } + + public Container importContainer(final ContainerType type, + final long containerId, final long maxSize, + final FileInputStream rawContainerStream, final TarContainerPacker packer) + throws IOException { + return handlers.get(type).importContainer( + containerId, maxSize, rawContainerStream, packer); + } + + /** + * Given a container, returns its handler instance. + * + * @param container Container + * @return handler of the container + */ + private Handler getHandler(final Container container) { + return handlers.get(container.getContainerType()); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 9fac3cbc83..a89b50a704 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,18 +19,20 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -47,27 +49,26 @@ import java.io.*; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - /** * Ozone main class sets up the network servers and initializes the container * layer. */ public class OzoneContainer { - public static final Logger LOG = LoggerFactory.getLogger( + private static final Logger LOG = LoggerFactory.getLogger( OzoneContainer.class); private final HddsDispatcher hddsDispatcher; - private final DatanodeDetails dnDetails; + private final Map handlers; private final OzoneConfiguration config; private final VolumeSet volumeSet; private final ContainerSet containerSet; - private final Map servers; + private final XceiverServerSpi writeChannel; + private final XceiverServerSpi readChannel; + private final ContainerController controller; /** * Construct OzoneContainer object. @@ -78,31 +79,42 @@ public class OzoneContainer { */ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration conf, StateContext context) throws IOException { - this.dnDetails = datanodeDetails; this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.containerSet = new ContainerSet(); buildContainerSet(); - hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, - context); - servers = new HashMap<>(); - servers.put(ReplicationType.STAND_ALONE, - new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher, - createReplicationService())); - servers.put(ReplicationType.RATIS, XceiverServerRatis - .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher, - context)); + final ContainerMetrics metrics = ContainerMetrics.create(conf); + this.handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, context, containerSet, volumeSet, metrics)); + } + this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, + handlers, context, metrics); + + /* + * ContainerController is the control plane + * XceiverServerRatis is the write channel + * XceiverServerGrpc is the read channel + */ + this.controller = new ContainerController(containerSet, handlers); + this.writeChannel = XceiverServerRatis.newXceiverServerRatis( + datanodeDetails, config, hddsDispatcher, context); + this.readChannel = new XceiverServerGrpc( + datanodeDetails, config, hddsDispatcher, createReplicationService()); + } private GrpcReplicationService createReplicationService() { return new GrpcReplicationService( - new OnDemandContainerReplicationSource(containerSet)); + new OnDemandContainerReplicationSource(controller)); } /** * Build's container map. */ - public void buildContainerSet() { + private void buildContainerSet() { Iterator volumeSetIterator = volumeSet.getVolumesList() .iterator(); ArrayList volumeThreads = new ArrayList(); @@ -111,7 +123,6 @@ public void buildContainerSet() { // And also handle disk failure tolerance need to be added while (volumeSetIterator.hasNext()) { HddsVolume volume = volumeSetIterator.next(); - File hddsVolumeRootDir = volume.getHddsRootDir(); Thread thread = new Thread(new ContainerReader(volumeSet, volume, containerSet, config)); thread.start(); @@ -135,9 +146,8 @@ public void buildContainerSet() { */ public void start() throws IOException { LOG.info("Attempting to start container services."); - for (XceiverServerSpi serverinstance : servers.values()) { - serverinstance.start(); - } + writeChannel.start(); + readChannel.start(); hddsDispatcher.init(); } @@ -147,9 +157,8 @@ public void start() throws IOException { public void stop() { //TODO: at end of container IO integration work. LOG.info("Attempting to stop container services."); - for(XceiverServerSpi serverinstance: servers.values()) { - serverinstance.stop(); - } + writeChannel.stop(); + readChannel.stop(); hddsDispatcher.shutdown(); } @@ -163,58 +172,24 @@ public ContainerSet getContainerSet() { * @return - container report. * @throws IOException */ - public StorageContainerDatanodeProtocolProtos.ContainerReportsProto - getContainerReport() throws IOException { - return this.containerSet.getContainerReport(); - } public PipelineReportsProto getPipelineReport() { PipelineReportsProto.Builder pipelineReportsProto = - PipelineReportsProto.newBuilder(); - for (XceiverServerSpi serverInstance : servers.values()) { - pipelineReportsProto - .addAllPipelineReport(serverInstance.getPipelineReport()); - } + PipelineReportsProto.newBuilder(); + pipelineReportsProto.addAllPipelineReport(writeChannel.getPipelineReport()); return pipelineReportsProto.build(); } - /** - * Submit ContainerRequest. - * @param request - * @param replicationType - * @param pipelineID - */ - public void submitContainerRequest( - ContainerProtos.ContainerCommandRequestProto request, - ReplicationType replicationType, - PipelineID pipelineID) throws IOException { - LOG.info("submitting {} request over {} server for container {}", - request.getCmdType(), replicationType, request.getContainerID()); - Preconditions.checkState(servers.containsKey(replicationType)); - servers.get(replicationType).submitRequest(request, pipelineID); + public XceiverServerSpi getWriteChannel() { + return writeChannel; } - private int getPortByType(ReplicationType replicationType) { - return servers.containsKey(replicationType) ? - servers.get(replicationType).getIPCPort() : INVALID_PORT; + public XceiverServerSpi getReadChannel() { + return readChannel; } - /** - * Returns the container servers IPC port. - * - * @return Container servers IPC port. - */ - public int getContainerServerPort() { - return getPortByType(ReplicationType.STAND_ALONE); - } - - /** - * Returns the Ratis container Server IPC port. - * - * @return Ratis port. - */ - public int getRatisContainerServerPort() { - return getPortByType(ReplicationType.RATIS); + public ContainerController getController() { + return controller; } /** @@ -230,11 +205,6 @@ public ContainerDispatcher getDispatcher() { return this.hddsDispatcher; } - @VisibleForTesting - public XceiverServerSpi getServer(ReplicationType replicationType) { - return servers.get(replicationType); - } - public VolumeSet getVolumeSet() { return volumeSet; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 5ef584184a..e8f0d17366 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -28,9 +28,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; import org.slf4j.Logger; @@ -49,7 +48,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerSet containerSet; - private final ContainerDispatcher containerDispatcher; + private final ContainerController controller; private final ContainerDownloader downloader; @@ -57,11 +56,11 @@ public class DownloadAndImportReplicator implements ContainerReplicator { public DownloadAndImportReplicator( ContainerSet containerSet, - ContainerDispatcher containerDispatcher, + ContainerController controller, ContainerDownloader downloader, TarContainerPacker packer) { this.containerSet = containerSet; - this.containerDispatcher = containerDispatcher; + this.controller = controller; this.downloader = downloader; this.packer = packer; } @@ -80,10 +79,9 @@ public void importContainer(long containerID, Path tarFilePath) { try (FileInputStream tempContainerTarStream = new FileInputStream( tarFilePath.toFile())) { - Handler handler = containerDispatcher.getHandler( - originalContainerData.getContainerType()); - - Container container = handler.importContainer(containerID, + Container container = controller.importContainer( + originalContainerData.getContainerType(), + containerID, originalContainerData.getMaxSize(), tempContainerTarStream, packer); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index d557b548b4..28b8713aa3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -20,12 +20,12 @@ import java.io.IOException; import java.io.OutputStream; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,13 +39,13 @@ public class OnDemandContainerReplicationSource private static final Logger LOG = LoggerFactory.getLogger(ContainerReplicationSource.class); - private ContainerSet containerSet; + private ContainerController controller; private ContainerPacker packer = new TarContainerPacker(); public OnDemandContainerReplicationSource( - ContainerSet containerSet) { - this.containerSet = containerSet; + ContainerController controller) { + this.controller = controller; } @Override @@ -57,7 +57,7 @@ public void prepare(long containerId) { public void copyData(long containerId, OutputStream destination) throws IOException { - Container container = containerSet.getContainer(containerId); + Container container = controller.getContainer(containerId); Preconditions .checkNotNull(container, "Container is not found " + containerId); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index 7849bcdf9e..4fe6ae464f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -30,14 +29,11 @@ public class CloseContainerCommand extends SCMCommand { - private HddsProtos.ReplicationType replicationType; - private PipelineID pipelineID; + private final PipelineID pipelineID; - public CloseContainerCommand(long containerID, - HddsProtos.ReplicationType replicationType, - PipelineID pipelineID) { + public CloseContainerCommand(final long containerID, + final PipelineID pipelineID) { super(containerID); - this.replicationType = replicationType; this.pipelineID = pipelineID; } @@ -65,7 +61,6 @@ public CloseContainerCommandProto getProto() { return CloseContainerCommandProto.newBuilder() .setContainerID(getId()) .setCmdId(getId()) - .setReplicationType(replicationType) .setPipelineID(pipelineID.getProtobuf()) .build(); } @@ -74,7 +69,6 @@ public static CloseContainerCommand getFromProtobuf( CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); return new CloseContainerCommand(closeContainerProto.getCmdId(), - closeContainerProto.getReplicationType(), PipelineID.getFromProtobuf(closeContainerProto.getPipelineID())); } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 33ea3070f1..9fdef7de78 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -140,8 +140,8 @@ message ContainerReplicaProto { enum State { OPEN = 1; CLOSING = 2; - CLOSED = 3; - QUASI_CLOSED = 4; + QUASI_CLOSED = 3; + CLOSED = 4; UNHEALTHY = 5; INVALID = 6; } @@ -289,9 +289,9 @@ This command asks the datanode to close a specific container. */ message CloseContainerCommandProto { required int64 containerID = 1; - required hadoop.hdds.ReplicationType replicationType = 2; + required PipelineID pipelineID = 2; + // cmdId will be removed required int64 cmdId = 3; - required PipelineID pipelineID = 4; } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 35cda0051d..085e6f9fcd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.impl; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; @@ -25,6 +26,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto .ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -33,7 +36,9 @@ .WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -47,6 +52,7 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.UUID; import static java.nio.charset.StandardCharsets.UTF_8; @@ -77,8 +83,15 @@ public void testContainerCloseActionWhenFull() throws IOException { container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); containerSet.addContainer(container); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, context); + conf, containerSet, volumeSet, handlers, context, metrics); hddsDispatcher.setScmId(scmId.toString()); ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch( getWriteChunkRequest(dd.getUuidString(), 1L, 1L)); @@ -113,8 +126,15 @@ public void testCreateContainerWithWriteChunk() throws IOException { ContainerSet containerSet = new ContainerSet(); VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf); StateContext context = Mockito.mock(StateContext.class); - HddsDispatcher hddsDispatcher = - new HddsDispatcher(conf, containerSet, volumeSet, context); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } + HddsDispatcher hddsDispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, context, metrics); hddsDispatcher.setScmId(scmId.toString()); ContainerCommandRequestProto writeChunkRequest = getWriteChunkRequest(dd.getUuidString(), 1L, 1L); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index b6582952bb..e7632897d1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -18,8 +18,10 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -32,6 +34,8 @@ import org.junit.rules.Timeout; import org.mockito.Mockito; +import java.util.Map; + /** * Tests Handler interface. */ @@ -50,8 +54,16 @@ public void setup() throws Exception { this.conf = new Configuration(); this.containerSet = Mockito.mock(ContainerSet.class); this.volumeSet = Mockito.mock(VolumeSet.class); - - this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } + this.dispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, null, metrics); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java new file mode 100644 index 0000000000..0cf95b78dc --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; + +/** + * Test cases to verify CloseContainerCommandHandler in datanode. + */ +public class TestCloseContainerCommandHandler { + + private static final StateContext CONTEXT = Mockito.mock(StateContext.class); + private static File testDir; + + + private OzoneContainer getOzoneContainer(final OzoneConfiguration conf, + final DatanodeDetails datanodeDetails) throws IOException { + testDir = GenericTestUtils.getTestDir( + TestCloseContainerCommandHandler.class.getName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath()); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testDir.getPath()); + + return new OzoneContainer(datanodeDetails, conf, CONTEXT); + } + + + @Test + public void testCloseContainerViaRatis() + throws IOException, InterruptedException { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); + container.getDispatcher().setScmId(UUID.randomUUID().toString()); + container.start(); + // Give some time for ratis for leader election. + final PipelineID pipelineID = PipelineID.randomId(); + final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); + final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, + Collections.singleton(datanodeDetails)); + final RaftClient client = RatisHelper.newRaftClient( + SupportedRpcType.GRPC, peer, retryPolicy); + System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); + Thread.sleep(2000); + final ContainerID containerId = ContainerID.valueof(1); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setContainerID(containerId.getId()); + request.setCreateContainer( + ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(datanodeDetails.getUuidString()); + container.getWriteChannel().submitRequest( + request.build(), pipelineID.getProtobuf()); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + // We have created a container via ratis. Now close the container on ratis. + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + final CloseContainerCommand command = new CloseContainerCommand( + containerId.getId(), pipelineID); + final DatanodeStateMachine datanodeStateMachine = Mockito.mock( + DatanodeStateMachine.class); + + Mockito.when(datanodeStateMachine.getDatanodeDetails()) + .thenReturn(datanodeDetails); + Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine); + + closeHandler.handle(command, container, CONTEXT, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat(); + container.stop(); + } + + @Test + public void testCloseContainerViaStandalone() + throws IOException, InterruptedException { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); + container.getDispatcher().setScmId(UUID.randomUUID().toString()); + container.start(); + // Give some time for ratis for leader election. + final PipelineID pipelineID = PipelineID.randomId(); + final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); + final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, + Collections.singleton(datanodeDetails)); + final RaftClient client = RatisHelper.newRaftClient( + SupportedRpcType.GRPC, peer, retryPolicy); + System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); + Thread.sleep(2000); + final ContainerID containerId = ContainerID.valueof(2); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setContainerID(containerId.getId()); + request.setCreateContainer( + ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(datanodeDetails.getUuidString()); + container.getWriteChannel().submitRequest( + request.build(), pipelineID.getProtobuf()); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + // We have created a container via ratis. Now quasi close it + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + // Specify a pipeline which doesn't exist in the datanode. + final CloseContainerCommand command = new CloseContainerCommand( + containerId.getId(), PipelineID.randomId()); + final DatanodeStateMachine datanodeStateMachine = Mockito.mock( + DatanodeStateMachine.class); + + Mockito.when(datanodeStateMachine.getDatanodeDetails()) + .thenReturn(datanodeDetails); + Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine); + + closeHandler.handle(command, container, CONTEXT, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat(); + container.stop(); + } + + /** + * Creates a random DatanodeDetails. + * @return DatanodeDetails + */ + private static DatanodeDetails randomDatanodeDetails() { + String ipAddress = "127.0.0.1"; + DatanodeDetails.Port containerPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.STANDALONE, 0); + DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.RATIS, 0); + DatanodeDetails.Port restPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.REST, 0); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress(ipAddress) + .addPort(containerPort) + .addPort(ratisPort) + .addPort(restPort); + return builder.build(); + } + + @AfterClass + public static void teardown() throws IOException { + FileUtils.deleteDirectory(testDir); + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 7fc065f2fc..265fb18d7f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -54,6 +54,7 @@ import java.io.File; +import java.io.IOException; import java.util.UUID; /** @@ -224,7 +225,7 @@ public void testVolumeSetInKeyValueHandler() throws Exception{ interval[0] = 2; ContainerMetrics metrics = new ContainerMetrics(interval); VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf); - KeyValueHandler keyValueHandler = new KeyValueHandler(conf, cset, + KeyValueHandler keyValueHandler = new KeyValueHandler(conf, null, cset, volumeSet, metrics); assertEquals("org.apache.hadoop.ozone.container.common" + ".volume.RoundRobinVolumeChoosingPolicy", @@ -235,7 +236,7 @@ public void testVolumeSetInKeyValueHandler() throws Exception{ conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY, "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher"); try { - new KeyValueHandler(conf, cset, volumeSet, metrics); + new KeyValueHandler(conf, null, cset, volumeSet, metrics); } catch (RuntimeException ex) { GenericTestUtils.assertExceptionContains("class org.apache.hadoop" + ".ozone.container.common.impl.HddsDispatcher not org.apache" + @@ -261,7 +262,7 @@ private ContainerCommandRequestProto getDummyCommandRequestProto( @Test - public void testCloseInvalidContainer() { + public void testCloseInvalidContainer() throws IOException { long containerID = 1234L; Configuration conf = new Configuration(); KeyValueContainerData kvData = new KeyValueContainerData(containerID, @@ -282,6 +283,7 @@ public void testCloseInvalidContainer() { Mockito.when(handler.handleCloseContainer(any(), any())) .thenCallRealMethod(); + doCallRealMethod().when(handler).closeContainer(any()); // Closing invalid container should return error response. ContainerProtos.ContainerCommandResponseProto response = handler.handleCloseContainer(closeContainerRequest, container); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 6f96f4b99e..34030e8473 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -172,7 +172,7 @@ private synchronized void preAllocateContainers(int count, LOG.warn("Unable to allocate container."); } } catch (IOException ex) { - LOG.warn("Unable to allocate container: {}", ex); + LOG.warn("Unable to allocate container.", ex); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 719d76334d..fd73711003 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -74,8 +74,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { if (container.getState() == LifeCycleState.CLOSING) { final CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerID.getId(), - container.getReplicationType(), container.getPipelineID()); + new CloseContainerCommand( + containerID.getId(), container.getPipelineID()); getNodes(container).forEach(node -> publisher.fireEvent( DATANODE_COMMAND, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index 631283c2bc..1ff6655976 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -19,7 +19,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -960,7 +959,7 @@ public void testHandlingSCMCommandEvent() throws IOException { TestUtils.getRandomPipelineReports()); eq.fireEvent(DATANODE_COMMAND, new CommandForDatanode<>(datanodeDetails.getUuid(), - new CloseContainerCommand(1L, ReplicationType.STAND_ALONE, + new CloseContainerCommand(1L, PipelineID.randomId()))); eq.processAll(1000L); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 0f35607e6d..16c9f22941 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; @@ -66,6 +65,7 @@ import org.apache.hadoop.ozone.container.common.states.endpoint .VersionEndpointTask; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.test.GenericTestUtils; @@ -75,6 +75,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; import static org.mockito.Mockito.mock; @@ -309,8 +310,10 @@ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, OzoneContainer ozoneContainer = mock(OzoneContainer.class); when(ozoneContainer.getNodeReport()).thenReturn(TestUtils .createNodeReport(getStorageReports(UUID.randomUUID()))); - when(ozoneContainer.getContainerReport()).thenReturn( + ContainerController controller = Mockito.mock(ContainerController.class); + when(controller.getContainerReport()).thenReturn( TestUtils.getRandomContainerReports(10)); + when(ozoneContainer.getController()).thenReturn(controller); when(ozoneContainer.getPipelineReport()).thenReturn( TestUtils.getRandomPipelineReports()); RegisterEndpointTask endpointTask = @@ -433,7 +436,6 @@ private void addScmCommands() { .setCloseContainerCommandProto( CloseContainerCommandProto.newBuilder().setCmdId(1) .setContainerID(1) - .setReplicationType(ReplicationType.RATIS) .setPipelineID(PipelineID.randomId().getProtobuf()) .build()) .setCommandType(Type.closeContainerCommand) diff --git a/hadoop-ozone/dist/src/main/smoketest/basic/basic.robot b/hadoop-ozone/dist/src/main/smoketest/basic/basic.robot index fb58520539..0162f9ea1a 100644 --- a/hadoop-ozone/dist/src/main/smoketest/basic/basic.robot +++ b/hadoop-ozone/dist/src/main/smoketest/basic/basic.robot @@ -42,6 +42,6 @@ Check webui static resources Should contain ${result} 200 Start freon testing - ${result} = Execute ozone freon randomkeys --numOfVolumes 5 --numOfBuckets 5 --numOfKeys 5 --numOfThreads 10 + ${result} = Execute ozone freon randomkeys --numOfVolumes 5 --numOfBuckets 5 --numOfKeys 5 --numOfThreads 1 Wait Until Keyword Succeeds 3min 10sec Should contain ${result} Number of Keys added: 125 Should Not Contain ${result} ERROR diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 3083660676..6427daed29 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -176,14 +176,14 @@ public void testContainerRandomPort() throws IOException { TestUtils.randomDatanodeDetails(), ozoneConf) ) { HashSet ports = new HashSet(); - assertTrue(ports.add(sm1.getContainer().getContainerServerPort())); - assertTrue(ports.add(sm2.getContainer().getContainerServerPort())); - assertTrue(ports.add(sm3.getContainer().getContainerServerPort())); + assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); + assertTrue(ports.add(sm2.getContainer().getReadChannel().getIPCPort())); + assertTrue(ports.add(sm3.getContainer().getReadChannel().getIPCPort())); // Assert that ratis is also on a different port. - assertTrue(ports.add(sm1.getContainer().getRatisContainerServerPort())); - assertTrue(ports.add(sm2.getContainer().getRatisContainerServerPort())); - assertTrue(ports.add(sm3.getContainer().getRatisContainerServerPort())); + assertTrue(ports.add(sm1.getContainer().getWriteChannel().getIPCPort())); + assertTrue(ports.add(sm2.getContainer().getWriteChannel().getIPCPort())); + assertTrue(ports.add(sm3.getContainer().getWriteChannel().getIPCPort())); } @@ -199,9 +199,9 @@ public void testContainerRandomPort() throws IOException { TestUtils.randomDatanodeDetails(), ozoneConf) ) { HashSet ports = new HashSet(); - assertTrue(ports.add(sm1.getContainer().getContainerServerPort())); - assertFalse(ports.add(sm2.getContainer().getContainerServerPort())); - assertFalse(ports.add(sm3.getContainer().getContainerServerPort())); + assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort())); + assertFalse(ports.add(sm2.getContainer().getReadChannel().getIPCPort())); + assertFalse(ports.add(sm3.getContainer().getReadChannel().getIPCPort())); assertEquals(ports.iterator().next().intValue(), conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT)); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 405ce8ed4a..abd60a1a5c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -134,7 +134,7 @@ public void testBlockWritesWithFlushAndClose() throws Exception { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); key.write(data); key.flush(); key.close(); @@ -162,11 +162,12 @@ public void testBlockWritesCloseConsistency() throws Exception { Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. @@ -199,7 +200,7 @@ public void testMultiBlockWrites() throws Exception { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); // write 1 more block worth of data. It will fail and new block will be // allocated key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize) @@ -249,7 +250,7 @@ public void testMultiBlockWrites2() throws Exception { .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); key.close(); // read the key from OM again and match the length.The length will still @@ -291,7 +292,7 @@ public void testMultiBlockWrites3() throws Exception { .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); // write 3 more chunks worth of data. It will fail and new block will be // allocated. This write completes 4 blocks worth of data written to key data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); @@ -321,7 +322,7 @@ public void testMultiBlockWrites3() throws Exception { } private void waitForContainerClose(String keyName, - OzoneOutputStream outputStream, HddsProtos.ReplicationType type) + OzoneOutputStream outputStream) throws Exception { ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) outputStream.getOutputStream(); @@ -332,11 +333,10 @@ private void waitForContainerClose(String keyName, containerIdList.add(info.getContainerID()); } Assert.assertTrue(!containerIdList.isEmpty()); - waitForContainerClose(type, containerIdList.toArray(new Long[0])); + waitForContainerClose(containerIdList.toArray(new Long[0])); } - private void waitForContainerClose(HddsProtos.ReplicationType type, - Long... containerIdList) + private void waitForContainerClose(Long... containerIdList) throws ContainerNotFoundException, PipelineNotFoundException, TimeoutException, InterruptedException { List pipelineList = new ArrayList<>(); @@ -358,7 +358,7 @@ private void waitForContainerClose(HddsProtos.ReplicationType type, // send the order to close the container cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(details.getUuid(), - new CloseContainerCommand(containerID, type, pipeline.getId())); + new CloseContainerCommand(containerID, pipeline.getId())); } } int index = 0; @@ -413,7 +413,7 @@ public void testDiscardPreallocatedBlocks() throws Exception { .getPipeline(container.getPipelineID()); List datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); dataString = ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); data = dataString.getBytes(UTF_8); @@ -459,7 +459,7 @@ public void testBlockWriteViaRatis() throws Exception { .build(); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key); // Again Write the Data. This will throw an exception which will be handled // and new blocks will be allocated key.write(data); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index 30e35361b3..695b3f1915 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -22,9 +22,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; @@ -32,7 +34,6 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -63,8 +64,9 @@ public class TestCloseContainerByPipeline { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, "1"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(3).build(); + .setNumDatanodes(9).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -86,7 +88,7 @@ public static void shutdown() { @Test public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception { OzoneOutputStream key = objectStore.getVolume("test").getBucket("test") - .createKey("testCloseContainer", 1024, ReplicationType.STAND_ALONE, + .createKey("standalone", 1024, ReplicationType.RATIS, ReplicationFactor.ONE); key.write("standalone".getBytes()); key.close(); @@ -94,10 +96,9 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception { //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test") - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024) - .setKeyName("testCloseContainer").build(); - + .setKeyName("standalone").build(); OmKeyLocationInfo omKeyLocationInfo = cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions() .get(0).getBlocksLatestVersionOnly().get(0); @@ -127,8 +128,7 @@ public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception { //send the order to close the container cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID, - HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId())); + new CloseContainerCommand(containerID, pipeline.getId())); GenericTestUtils .waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails), 500, 5 * 1000); @@ -142,7 +142,7 @@ public void testCloseContainerViaStandAlone() throws IOException, TimeoutException, InterruptedException { OzoneOutputStream key = objectStore.getVolume("test").getBucket("test") - .createKey("standalone", 1024, ReplicationType.STAND_ALONE, + .createKey("standalone", 1024, ReplicationType.RATIS, ReplicationFactor.ONE); key.write("standalone".getBytes()); key.close(); @@ -150,7 +150,7 @@ public void testCloseContainerViaStandAlone() //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test") - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024) .setKeyName("standalone").build(); @@ -170,30 +170,20 @@ public void testCloseContainerViaStandAlone() Assert .assertFalse(isContainerClosed(cluster, containerID, datanodeDetails)); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG); - //send the order to close the container + // Send the order to close the container, give random pipeline id so that + // the container will not be closed via RATIS cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID, - HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId())); - - // The log will appear after the state changed to closed in standalone, - // wait for the log to ensure the operation has been done. - GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( - "submitting CloseContainer request over STAND_ALONE server for" - + " container " + containerID), 500, 5 * 1000); + new CloseContainerCommand(containerID, PipelineID.randomId())); //double check if it's really closed (waitFor also throws an exception) - Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); - Assert.assertTrue(logCapturer.getOutput().contains( - "submitting CloseContainer request over STAND_ALONE server for" - + " container " + containerID)); - // Make sure it was really closed via StandAlone not Ratis server - Assert.assertFalse((logCapturer.getOutput().contains( - "submitting CloseContainer request over RATIS server for container " - + containerID))); - logCapturer.stopCapturing(); + // TODO: change the below line after implementing QUASI_CLOSED to CLOSED + // logic. The container will be QUASI closed as of now + GenericTestUtils + .waitFor(() -> isContainerQuasiClosed( + cluster, containerID, datanodeDetails), 500, 5 * 1000); + Assert.assertTrue( + isContainerQuasiClosed(cluster, containerID, datanodeDetails)); } @Test @@ -224,18 +214,14 @@ public void testCloseContainerViaRatis() throws IOException, List datanodes = pipeline.getNodes(); Assert.assertEquals(3, datanodes.size()); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG); - for (DatanodeDetails details : datanodes) { Assert.assertFalse(isContainerClosed(cluster, containerID, details)); //send the order to close the container cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(details.getUuid(), - new CloseContainerCommand(containerID, - HddsProtos.ReplicationType.RATIS, pipeline.getId())); + new CloseContainerCommand(containerID, pipeline.getId())); } - + // Make sure that it is CLOSED for (DatanodeDetails datanodeDetails : datanodes) { GenericTestUtils.waitFor( () -> isContainerClosed(cluster, containerID, datanodeDetails), 500, @@ -244,14 +230,6 @@ public void testCloseContainerViaRatis() throws IOException, Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); } - // Make sure it was really closed via Ratis not STAND_ALONE server - Assert.assertFalse(logCapturer.getOutput().contains( - "submitting CloseContainer request over STAND_ALONE " - + "server for container " + containerID)); - Assert.assertTrue((logCapturer.getOutput().contains( - "submitting CloseContainer request over RATIS server for container " - + containerID))); - logCapturer.stopCapturing(); } private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID, @@ -267,4 +245,18 @@ private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID, } return false; } + + private Boolean isContainerQuasiClosed(MiniOzoneCluster miniCluster, + long containerID, DatanodeDetails datanode) { + ContainerData containerData; + for (HddsDatanodeService datanodeService : miniCluster.getHddsDatanodes()) { + if (datanode.equals(datanodeService.getDatanodeDetails())) { + containerData = + datanodeService.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID).getContainerData(); + return containerData.isQuasiClosed(); + } + } + return false; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java index 9cf51d1715..588a30143d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -95,8 +95,7 @@ public void test() throws IOException, TimeoutException, InterruptedException, //send the order to close the container cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerId.getId(), - HddsProtos.ReplicationType.STAND_ALONE, pipeline.getId())); + new CloseContainerCommand(containerId.getId(), pipeline.getId())); GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerId.getId()), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index d4f7ae55fc..527ab453c9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -36,12 +37,15 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; import org.apache.hadoop.test.GenericTestUtils; @@ -49,6 +53,7 @@ import org.junit.Test; import java.io.File; +import java.util.Map; import java.util.UUID; /** @@ -57,9 +62,9 @@ public class TestContainerMetrics { private GrpcReplicationService createReplicationService( - ContainerSet containerSet) { + ContainerController controller) { return new GrpcReplicationService( - new OnDemandContainerReplicationSource(containerSet)); + new OnDemandContainerReplicationSource(controller)); } @Test @@ -85,12 +90,21 @@ public void testContainerMetrics() throws Exception { VolumeSet volumeSet = new VolumeSet( datanodeDetails.getUuidString(), conf); ContainerSet containerSet = new ContainerSet(); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet, - volumeSet, null); + volumeSet, handlers, null, metrics); dispatcher.setScmId(UUID.randomUUID().toString()); server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, - createReplicationService(containerSet)); + createReplicationService(new ContainerController( + containerSet, handlers))); client = new XceiverClientGrpc(pipeline, conf); server.start(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 9a3fa1b4a0..3e98594f79 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.container.server; +import com.google.common.collect.Maps; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -57,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.ratis.rpc.SupportedRpcType.GRPC; import static org.apache.ratis.rpc.SupportedRpcType.NETTY; @@ -71,15 +75,17 @@ public class TestContainerServer { = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator; private GrpcReplicationService createReplicationService( - ContainerSet containerSet) { + ContainerController containerController) { return new GrpcReplicationService( - new OnDemandContainerReplicationSource(containerSet)); + new OnDemandContainerReplicationSource(containerController)); } @Test public void testClientServer() throws Exception { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); ContainerSet containerSet = new ContainerSet(); + ContainerController controller = new ContainerController( + containerSet, null); runTestClientServer(1, (pipeline, conf) -> conf .setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getFirstNode() @@ -87,7 +93,7 @@ public void testClientServer() throws Exception { XceiverClientGrpc::new, (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf, new TestContainerDispatcher(), - createReplicationService(containerSet)), (dn, p) -> { + createReplicationService(controller)), (dn, p) -> { }); } @@ -185,12 +191,22 @@ public void testClientServerWithContainerDispatcher() throws Exception { .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); ContainerSet containerSet = new ContainerSet(); + VolumeSet volumeSet = mock(VolumeSet.class); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } HddsDispatcher dispatcher = new HddsDispatcher( - conf, mock(ContainerSet.class), mock(VolumeSet.class), null); + conf, containerSet, volumeSet, handlers, null, metrics); dispatcher.init(); DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, - createReplicationService(containerSet)); + createReplicationService( + new ContainerController(containerSet, null))); client = new XceiverClientGrpc(pipeline, conf); server.start(); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index f9e57536ce..01b51fa76c 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.ozone.genesis; +import com.google.common.collect.Maps; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine .DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -44,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -100,9 +104,18 @@ public void initialize() throws IOException { ContainerSet containerSet = new ContainerSet(); VolumeSet volumeSet = new VolumeSet(datanodeUuid, conf); - - dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, - new StateContext(conf, DatanodeStates.RUNNING, null)); + StateContext context = new StateContext( + conf, DatanodeStates.RUNNING, null); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, context, containerSet, volumeSet, metrics)); + } + dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, handlers, + context, metrics); dispatcher.init(); containerCount = new AtomicInteger(); diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java index a91e1904ab..4a0cb611a9 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.container.common.transport .server.XceiverServerSpi; @@ -128,7 +127,7 @@ private void startFreon() throws Exception { private StateMachine getStateMachine() throws Exception { XceiverServerSpi server = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). - getContainer().getServer(HddsProtos.ReplicationType.RATIS); + getContainer().getWriteChannel(); RaftServerProxy proxy = (RaftServerProxy)(((XceiverServerRatis)server).getServer()); RaftGroupId groupId = proxy.getGroupIds().iterator().next();