HDDS-287. Add Close ContainerAction to Datanode#StateContext when the container gets full. Contributed by Nanda kumar.

This commit is contained in:
Nanda kumar 2018-07-30 21:18:42 +05:30
parent 952dc2fd55
commit 3517a47897
14 changed files with 270 additions and 21 deletions

View File

@ -21,12 +21,21 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -35,11 +44,14 @@
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerLifeCycleState;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
@ -53,6 +65,8 @@ public class HddsDispatcher implements ContainerDispatcher {
private final Configuration conf;
private final ContainerSet containerSet;
private final VolumeSet volumeSet;
private final StateContext context;
private final float containerCloseThreshold;
private String scmID;
private ContainerMetrics metrics;
@ -61,10 +75,11 @@ public class HddsDispatcher implements ContainerDispatcher {
* XceiverServerHandler.
*/
public HddsDispatcher(Configuration config, ContainerSet contSet,
VolumeSet volumes) {
VolumeSet volumes, StateContext context) {
this.conf = config;
this.containerSet = contSet;
this.volumeSet = volumes;
this.context = context;
this.handlers = Maps.newHashMap();
this.metrics = ContainerMetrics.create(conf);
for (ContainerType containerType : ContainerType.values()) {
@ -72,6 +87,9 @@ public HddsDispatcher(Configuration config, ContainerSet contSet,
Handler.getHandlerForContainerType(
containerType, conf, containerSet, volumeSet, metrics));
}
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
}
@ -113,7 +131,11 @@ public ContainerCommandResponseProto dispatch(
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
if (!HddsUtils.isReadOnly(msg)) {
sendCloseContainerActionIfNeeded(container);
}
Handler handler = getHandler(containerType);
if (handler == null) {
StorageContainerException ex = new StorageContainerException("Invalid " +
@ -130,6 +152,43 @@ public ContainerCommandResponseProto dispatch(
}
}
/**
* If the container usage reaches the close threshold we send Close
* ContainerAction to SCM.
*
* @param container current state of container
*/
private void sendCloseContainerActionIfNeeded(Container container) {
// We have to find a more efficient way to close a container.
Boolean isOpen = Optional.ofNullable(container)
.map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN)
.orElse(Boolean.FALSE);
if (isOpen) {
ContainerData containerData = container.getContainerData();
double containerUsedPercentage = 1.0f * containerData.getBytesUsed() /
StorageUnit.GB.toBytes(containerData.getMaxSizeGB());
if (containerUsedPercentage >= containerCloseThreshold) {
ContainerInfo containerInfo = ContainerInfo.newBuilder()
.setContainerID(containerData.getContainerID())
.setReadCount(containerData.getReadCount())
.setWriteCount(containerData.getWriteCount())
.setReadBytes(containerData.getReadBytes())
.setWriteBytes(containerData.getWriteBytes())
.setUsed(containerData.getBytesUsed())
.setState(HddsProtos.LifeCycleState.OPEN)
.build();
ContainerAction action = ContainerAction.newBuilder()
.setContainer(containerInfo)
.setAction(ContainerAction.Action.CLOSE)
.setReason(ContainerAction.Reason.CONTAINER_FULL)
.build();
context.addContainerActionIfAbsent(action);
}
}
}
@Override
public Handler getHandler(ContainerProtos.ContainerType containerType) {
return handlers.get(containerType);

View File

@ -89,7 +89,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
heartbeatFrequency = TimeUnit.SECONDS.toMillis(
getScmHeartbeatInterval(conf));
container = new OzoneContainer(this.datanodeDetails,
new OzoneConfiguration(conf));
new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow());
// When we add new handlers just adding a new handler here should do the

View File

@ -36,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@ -212,6 +211,19 @@ public void addContainerAction(ContainerAction containerAction) {
}
}
/**
* Add ContainerAction to ContainerAction queue if it's not present.
*
* @param containerAction ContainerAction to be added
*/
public void addContainerActionIfAbsent(ContainerAction containerAction) {
synchronized (containerActions) {
if (!containerActions.contains(containerAction)) {
containerActions.add(containerAction);
}
}
}
/**
* Returns all the pending ContainerActions from the ContainerAction queue,
* or empty list if the queue is empty.

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@ -70,7 +71,7 @@ public class OzoneContainer {
* @throws IOException
*/
public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
conf) throws IOException {
conf, StateContext context) throws IOException {
this.dnDetails = datanodeDetails;
this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
@ -79,7 +80,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
buildContainerSet();
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet);
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
context);
server = new XceiverServerSpi[]{
useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this
.hddsDispatcher) :

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test-cases to verify the functionality of HddsDispatcher.
*/
public class TestHddsDispatcher {
@Test
public void testContainerCloseActionWhenFull() throws IOException {
String testDir = GenericTestUtils.getTempPath(
TestHddsDispatcher.class.getSimpleName());
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
StateContext context = Mockito.mock(StateContext.class);
KeyValueContainerData containerData = new KeyValueContainerData(1L, 1);
Container container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, context);
hddsDispatcher.setScmId(scmId.toString());
ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch(
getWriteChunkRequest(dd.getUuidString(), 1L, 1L));
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseOne.getResult());
verify(context, times(0))
.addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
containerData.setBytesUsed(Double.valueOf(
StorageUnit.MB.toBytes(950)).longValue());
ContainerCommandResponseProto responseTwo = hddsDispatcher.dispatch(
getWriteChunkRequest(dd.getUuidString(), 1L, 2L));
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseTwo.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}
// This method has to be removed once we move scm/TestUtils.java
// from server-scm project to container-service or to common project.
private static DatanodeDetails randomDatanodeDetails() {
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
builder.setUuid(UUID.randomUUID().toString())
.setHostName("localhost")
.setIpAddress("127.0.0.1")
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort);
return builder.build();
}
private ContainerCommandRequestProto getWriteChunkRequest(
String datanodeId, Long containerId, Long localId) {
ByteString data = ByteString.copyFrom(
UUID.randomUUID().toString().getBytes());
ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo
.newBuilder()
.setChunkName(
DigestUtils.md5Hex("dummy-key") + "_stream_"
+ containerId + "_chunk_" + localId)
.setOffset(0)
.setLen(data.size())
.build();
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(new BlockID(containerId, localId)
.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
return ContainerCommandRequestProto
.newBuilder()
.setContainerID(containerId)
.setCmdType(ContainerProtos.Type.WriteChunk)
.setTraceID(UUID.randomUUID().toString())
.setDatanodeUuid(datanodeId)
.setWriteChunk(writeChunkRequest)
.build();
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Datanode container related test-cases.
*/
package org.apache.hadoop.ozone.container.common.impl;

View File

@ -52,15 +52,13 @@ public class TestHandler {
private VolumeSet volumeSet;
private Handler handler;
private final static String DATANODE_UUID = UUID.randomUUID().toString();
@Before
public void setup() throws Exception {
this.conf = new Configuration();
this.containerSet = Mockito.mock(ContainerSet.class);
this.volumeSet = Mockito.mock(VolumeSet.class);
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null);
}
@Test

View File

@ -86,7 +86,7 @@ public void testBuildContainerMap() throws Exception {
// When OzoneContainer is started, the containers from disk should be
// loaded into the containerSet.
OzoneContainer ozoneContainer = new
OzoneContainer(datanodeDetails, conf);
OzoneContainer(datanodeDetails, conf, null);
ContainerSet containerset = ozoneContainer.getContainerSet();
assertEquals(10, containerset.containerCount());
}

View File

@ -151,8 +151,8 @@ public void testGetVersionTask() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf);
OzoneContainer ozoneContainer = new OzoneContainer(
TestUtils.randomDatanodeDetails(), conf, null);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -176,7 +176,7 @@ public void testCheckVersionResponse() throws Exception {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(VersionEndpointTask.LOG);
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils
.randomDatanodeDetails(), conf);
.randomDatanodeDetails(), conf, null);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -228,7 +228,7 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
nonExistentServerAddress, 1000)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf);
conf, null);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
EndpointStateMachine.EndPointStates newState = versionTask.call();
@ -254,8 +254,8 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, (int) rpcTimeout)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf);
OzoneContainer ozoneContainer = new OzoneContainer(
TestUtils.randomDatanodeDetails(), conf, null);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);

View File

@ -86,7 +86,7 @@ public static void setup() throws Exception {
.setHostName("localhost").setIpAddress("127.0.0.1").build();
volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null);
handler = (KeyValueHandler) dispatcher
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
openContainerBlockMap = handler.getOpenContainerBlockMap();

View File

@ -78,7 +78,7 @@ public void testContainerMetrics() throws Exception {
datanodeDetails.getUuidString(), conf);
ContainerSet containerSet = new ContainerSet();
HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
volumeSet);
volumeSet, null);
dispatcher.setScmId(UUID.randomUUID().toString());
server = new XceiverServer(datanodeDetails, conf, dispatcher);

View File

@ -72,7 +72,7 @@ public void testCreateOzoneContainer() throws Exception {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
container = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf);
conf, null);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();

View File

@ -214,7 +214,7 @@ public void testClientServerWithContainerDispatcher() throws Exception {
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
HddsDispatcher dispatcher = new HddsDispatcher(
conf, mock(ContainerSet.class), mock(VolumeSet.class));
conf, mock(ContainerSet.class), mock(VolumeSet.class), null);
dispatcher.init();
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
server = new XceiverServer(datanodeDetails, conf, dispatcher);

View File

@ -20,6 +20,9 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
@ -104,7 +107,8 @@ public void initialize() throws IOException {
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(datanodeUuid, conf);
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet,
new StateContext(conf, DatanodeStates.RUNNING, null));
dispatcher.init();
containerCount = new AtomicInteger();