From 3517a47897457c11096ab57a4cb0b096a838a3ec Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Mon, 30 Jul 2018 21:18:42 +0530 Subject: [PATCH] HDDS-287. Add Close ContainerAction to Datanode#StateContext when the container gets full. Contributed by Nanda kumar. --- .../container/common/impl/HddsDispatcher.java | 63 +++++++- .../statemachine/DatanodeStateMachine.java | 2 +- .../common/statemachine/StateContext.java | 14 +- .../container/ozoneimpl/OzoneContainer.java | 6 +- .../common/impl/TestHddsDispatcher.java | 152 ++++++++++++++++++ .../container/common/impl/package-info.java | 22 +++ .../common/interfaces/TestHandler.java | 4 +- .../ozoneimpl/TestOzoneContainer.java | 2 +- .../ozone/container/common/TestEndPoint.java | 12 +- .../impl/TestCloseContainerHandler.java | 2 +- .../metrics/TestContainerMetrics.java | 2 +- .../ozoneimpl/TestOzoneContainer.java | 2 +- .../container/server/TestContainerServer.java | 2 +- .../genesis/BenchMarkDatanodeDispatcher.java | 6 +- 14 files changed, 270 insertions(+), 21 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/package-info.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 3d418e5ec4..ee232db73e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -21,12 +21,21 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -35,11 +44,14 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerLifeCycleState; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Optional; /** * Ozone Container dispatcher takes a call from the netty server and routes it @@ -53,6 +65,8 @@ public class HddsDispatcher implements ContainerDispatcher { private final Configuration conf; private final ContainerSet containerSet; private final VolumeSet volumeSet; + private final StateContext context; + private final float containerCloseThreshold; private String scmID; private ContainerMetrics metrics; @@ -61,10 +75,11 @@ public class HddsDispatcher implements ContainerDispatcher { * XceiverServerHandler. */ public HddsDispatcher(Configuration config, ContainerSet contSet, - VolumeSet volumes) { + VolumeSet volumes, StateContext context) { this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; + this.context = context; this.handlers = Maps.newHashMap(); this.metrics = ContainerMetrics.create(conf); for (ContainerType containerType : ContainerType.values()) { @@ -72,6 +87,9 @@ public HddsDispatcher(Configuration config, ContainerSet contSet, Handler.getHandlerForContainerType( containerType, conf, containerSet, volumeSet, metrics)); } + this.containerCloseThreshold = conf.getFloat( + ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, + ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); } @@ -113,7 +131,11 @@ public ContainerCommandResponseProto dispatch( } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, msg); } - + // Small performance optimization. We check if the operation is of type + // write before trying to send CloseContainerAction. + if (!HddsUtils.isReadOnly(msg)) { + sendCloseContainerActionIfNeeded(container); + } Handler handler = getHandler(containerType); if (handler == null) { StorageContainerException ex = new StorageContainerException("Invalid " + @@ -130,6 +152,43 @@ public ContainerCommandResponseProto dispatch( } } + /** + * If the container usage reaches the close threshold we send Close + * ContainerAction to SCM. + * + * @param container current state of container + */ + private void sendCloseContainerActionIfNeeded(Container container) { + // We have to find a more efficient way to close a container. + Boolean isOpen = Optional.ofNullable(container) + .map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN) + .orElse(Boolean.FALSE); + if (isOpen) { + ContainerData containerData = container.getContainerData(); + double containerUsedPercentage = 1.0f * containerData.getBytesUsed() / + StorageUnit.GB.toBytes(containerData.getMaxSizeGB()); + if (containerUsedPercentage >= containerCloseThreshold) { + + ContainerInfo containerInfo = ContainerInfo.newBuilder() + .setContainerID(containerData.getContainerID()) + .setReadCount(containerData.getReadCount()) + .setWriteCount(containerData.getWriteCount()) + .setReadBytes(containerData.getReadBytes()) + .setWriteBytes(containerData.getWriteBytes()) + .setUsed(containerData.getBytesUsed()) + .setState(HddsProtos.LifeCycleState.OPEN) + .build(); + + ContainerAction action = ContainerAction.newBuilder() + .setContainer(containerInfo) + .setAction(ContainerAction.Action.CLOSE) + .setReason(ContainerAction.Reason.CONTAINER_FULL) + .build(); + context.addContainerActionIfAbsent(action); + } + } + } + @Override public Handler getHandler(ContainerProtos.ContainerType containerType) { return handlers.get(containerType); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 69a243e937..1ac42dd45c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -89,7 +89,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, heartbeatFrequency = TimeUnit.SECONDS.toMillis( getScmHeartbeatInterval(conf)); container = new OzoneContainer(this.datanodeDetails, - new OzoneConfiguration(conf)); + new OzoneConfiguration(conf), context); nextHB = new AtomicLong(Time.monotonicNow()); // When we add new handlers just adding a new handler here should do the diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 7862cc6236..19c949680c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -212,6 +211,19 @@ public void addContainerAction(ContainerAction containerAction) { } } + /** + * Add ContainerAction to ContainerAction queue if it's not present. + * + * @param containerAction ContainerAction to be added + */ + public void addContainerActionIfAbsent(ContainerAction containerAction) { + synchronized (containerActions) { + if (!containerActions.contains(containerAction)) { + containerActions.add(containerAction); + } + } + } + /** * Returns all the pending ContainerActions from the ContainerAction queue, * or empty list if the queue is empty. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 30fe1134ad..85c947fecb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -70,7 +71,7 @@ public class OzoneContainer { * @throws IOException */ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration - conf) throws IOException { + conf, StateContext context) throws IOException { this.dnDetails = datanodeDetails; this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); @@ -79,7 +80,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); buildContainerSet(); - hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet); + hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, + context); server = new XceiverServerSpi[]{ useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this .hddsDispatcher) : diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java new file mode 100644 index 0000000000..b10778217b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test-cases to verify the functionality of HddsDispatcher. + */ +public class TestHddsDispatcher { + + @Test + public void testContainerCloseActionWhenFull() throws IOException { + String testDir = GenericTestUtils.getTempPath( + TestHddsDispatcher.class.getSimpleName()); + try { + UUID scmId = UUID.randomUUID(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, testDir); + DatanodeDetails dd = randomDatanodeDetails(); + ContainerSet containerSet = new ContainerSet(); + VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf); + StateContext context = Mockito.mock(StateContext.class); + KeyValueContainerData containerData = new KeyValueContainerData(1L, 1); + Container container = new KeyValueContainer(containerData, conf); + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), + scmId.toString()); + containerSet.addContainer(container); + HddsDispatcher hddsDispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, context); + hddsDispatcher.setScmId(scmId.toString()); + ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch( + getWriteChunkRequest(dd.getUuidString(), 1L, 1L)); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + responseOne.getResult()); + verify(context, times(0)) + .addContainerActionIfAbsent(Mockito.any(ContainerAction.class)); + containerData.setBytesUsed(Double.valueOf( + StorageUnit.MB.toBytes(950)).longValue()); + ContainerCommandResponseProto responseTwo = hddsDispatcher.dispatch( + getWriteChunkRequest(dd.getUuidString(), 1L, 2L)); + Assert.assertEquals(ContainerProtos.Result.SUCCESS, + responseTwo.getResult()); + verify(context, times(1)) + .addContainerActionIfAbsent(Mockito.any(ContainerAction.class)); + + } finally { + FileUtils.deleteDirectory(new File(testDir)); + } + + } + + // This method has to be removed once we move scm/TestUtils.java + // from server-scm project to container-service or to common project. + private static DatanodeDetails randomDatanodeDetails() { + DatanodeDetails.Port containerPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.STANDALONE, 0); + DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.RATIS, 0); + DatanodeDetails.Port restPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.REST, 0); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress("127.0.0.1") + .addPort(containerPort) + .addPort(ratisPort) + .addPort(restPort); + return builder.build(); + } + + private ContainerCommandRequestProto getWriteChunkRequest( + String datanodeId, Long containerId, Long localId) { + + ByteString data = ByteString.copyFrom( + UUID.randomUUID().toString().getBytes()); + ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName( + DigestUtils.md5Hex("dummy-key") + "_stream_" + + containerId + "_chunk_" + localId) + .setOffset(0) + .setLen(data.size()) + .build(); + + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto + .newBuilder() + .setBlockID(new BlockID(containerId, localId) + .getDatanodeBlockIDProtobuf()) + .setChunkData(chunk) + .setData(data); + + return ContainerCommandRequestProto + .newBuilder() + .setContainerID(containerId) + .setCmdType(ContainerProtos.Type.WriteChunk) + .setTraceID(UUID.randomUUID().toString()) + .setDatanodeUuid(datanodeId) + .setWriteChunk(writeChunkRequest) + .build(); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/package-info.java new file mode 100644 index 0000000000..07c78c0498 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * Datanode container related test-cases. + */ +package org.apache.hadoop.ozone.container.common.impl; \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index 6660e9b5b5..c9733f864a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -52,15 +52,13 @@ public class TestHandler { private VolumeSet volumeSet; private Handler handler; - private final static String DATANODE_UUID = UUID.randomUUID().toString(); - @Before public void setup() throws Exception { this.conf = new Configuration(); this.containerSet = Mockito.mock(ContainerSet.class); this.volumeSet = Mockito.mock(VolumeSet.class); - this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); + this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 284ffa386e..19ec6a2545 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -86,7 +86,7 @@ public void testBuildContainerMap() throws Exception { // When OzoneContainer is started, the containers from disk should be // loaded into the containerSet. OzoneContainer ozoneContainer = new - OzoneContainer(datanodeDetails, conf); + OzoneContainer(datanodeDetails, conf, null); ContainerSet containerset = ozoneContainer.getContainerSet(); assertEquals(10, containerset.containerCount()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index e24e73e204..e9359b8e3f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -151,8 +151,8 @@ public void testGetVersionTask() throws Exception { OzoneConfiguration conf = SCMTestUtils.getConf(); try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, 1000)) { - OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(), - conf); + OzoneContainer ozoneContainer = new OzoneContainer( + TestUtils.randomDatanodeDetails(), conf, null); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, conf, ozoneContainer); @@ -176,7 +176,7 @@ public void testCheckVersionResponse() throws Exception { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer .captureLogs(VersionEndpointTask.LOG); OzoneContainer ozoneContainer = new OzoneContainer(TestUtils - .randomDatanodeDetails(), conf); + .randomDatanodeDetails(), conf, null); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, conf, ozoneContainer); @@ -228,7 +228,7 @@ public void testGetVersionToInvalidEndpoint() throws Exception { nonExistentServerAddress, 1000)) { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(), - conf); + conf, null); VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, conf, ozoneContainer); EndpointStateMachine.EndPointStates newState = versionTask.call(); @@ -254,8 +254,8 @@ public void testGetVersionAssertRpcTimeOut() throws Exception { try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, (int) rpcTimeout)) { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); - OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(), - conf); + OzoneContainer ozoneContainer = new OzoneContainer( + TestUtils.randomDatanodeDetails(), conf, null); VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, conf, ozoneContainer); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java index d67cf88888..73fa70d1db 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java @@ -86,7 +86,7 @@ public static void setup() throws Exception { .setHostName("localhost").setIpAddress("127.0.0.1").build(); volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); - dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); + dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null); handler = (KeyValueHandler) dispatcher .getHandler(ContainerProtos.ContainerType.KeyValueContainer); openContainerBlockMap = handler.getOpenContainerBlockMap(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index 13ed192bcb..19b561a427 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -78,7 +78,7 @@ public void testContainerMetrics() throws Exception { datanodeDetails.getUuidString(), conf); ContainerSet containerSet = new ContainerSet(); HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet, - volumeSet); + volumeSet, null); dispatcher.setScmId(UUID.randomUUID().toString()); server = new XceiverServer(datanodeDetails, conf, dispatcher); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index d271ed3966..215dd21d8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -72,7 +72,7 @@ public void testCreateOzoneContainer() throws Exception { conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); container = new OzoneContainer(TestUtils.randomDatanodeDetails(), - conf); + conf, null); //Setting scmId, as we start manually ozone container. container.getDispatcher().setScmId(UUID.randomUUID().toString()); container.start(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index bdb26fbe65..ebcc9302c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -214,7 +214,7 @@ public void testClientServerWithContainerDispatcher() throws Exception { .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); HddsDispatcher dispatcher = new HddsDispatcher( - conf, mock(ContainerSet.class), mock(VolumeSet.class)); + conf, mock(ContainerSet.class), mock(VolumeSet.class), null); dispatcher.init(); DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); server = new XceiverServer(datanodeDetails, conf, dispatcher); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index e757a7f22b..3c49fb6b18 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -20,6 +20,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine.DatanodeStates; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; @@ -104,7 +107,8 @@ public void initialize() throws IOException { ContainerSet containerSet = new ContainerSet(); VolumeSet volumeSet = new VolumeSet(datanodeUuid, conf); - dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); + dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, + new StateContext(conf, DatanodeStates.RUNNING, null)); dispatcher.init(); containerCount = new AtomicInteger();