HDDS-774. Remove OpenContainerBlockMap from datanode. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-11-14 20:05:56 +05:30
parent a948281706
commit b57cc73f83
3 changed files with 2 additions and 384 deletions

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
@ -109,7 +108,6 @@ public class KeyValueHandler extends Handler {
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long maxContainerSize;
private final AutoCloseableLock handlerLock;
private final OpenContainerBlockMap openContainerBlockMap;
public KeyValueHandler(Configuration config, ContainerSet contSet,
VolumeSet volSet, ContainerMetrics metrics) {
@ -138,21 +136,12 @@ public KeyValueHandler(Configuration config, ContainerSet contSet,
// this handler lock is used for synchronizing createContainer Requests,
// so using a fair lock here.
handlerLock = new AutoCloseableLock(new ReentrantLock(true));
openContainerBlockMap = new OpenContainerBlockMap();
}
@VisibleForTesting
public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
return volumeChoosingPolicy;
}
/**
* Returns OpenContainerBlockMap instance.
*
* @return OpenContainerBlockMap
*/
public OpenContainerBlockMap getOpenContainerBlockMap() {
return openContainerBlockMap;
}
@Override
public ContainerCommandResponseProto handle(
@ -355,7 +344,6 @@ ContainerCommandResponseProto handleDeleteContainer(
} else {
long containerId = kvContainer.getContainerData().getContainerID();
containerSet.removeContainer(containerId);
openContainerBlockMap.removeContainer(containerId);
// Release the lock first.
// Avoid holding write locks for disk operations
kvContainer.writeUnlock();
@ -388,19 +376,11 @@ ContainerCommandResponseProto handleCloseContainer(
long containerID = kvContainer.getContainerData().getContainerID();
try {
checkContainerOpen(kvContainer);
KeyValueContainerData kvData = kvContainer.getContainerData();
// remove the container from open block map once, all the blocks
// have been committed and the container is closed
commitPendingBlocks(kvContainer);
// TODO : The close command should move the container to either quasi
// closed/closed depending upon how the closeContainer gets executed.
// If it arrives by Standalone, it will be moved to Quasi Closed or
// otherwise moved to Closed state if it gets executed via Ratis.
kvContainer.close();
// make sure the the container open keys from BlockMap gets removed
openContainerBlockMap.removeContainer(kvData.getContainerID());
} catch (StorageContainerException ex) {
if (ex.getResult() == CLOSED_CONTAINER_IO) {
LOG.debug("Container {} is already closed.", containerID);
@ -434,8 +414,9 @@ ContainerCommandResponseProto handlePutBlock(
BlockData blockData = BlockData.getFromProtoBuf(
request.getPutBlock().getBlockData());
Preconditions.checkNotNull(blockData);
long numBytes = blockData.getProtoBufMessage().toByteArray().length;
blockLength = commitKey(blockData, kvContainer);
blockLength = blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutBlock, numBytes);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -448,24 +429,6 @@ ContainerCommandResponseProto handlePutBlock(
return BlockUtils.putBlockResponseSuccess(request, blockLength);
}
private void commitPendingBlocks(KeyValueContainer kvContainer)
throws IOException {
long containerId = kvContainer.getContainerData().getContainerID();
List<BlockData> pendingBlocks =
this.openContainerBlockMap.getOpenBlocks(containerId);
for(BlockData blockData : pendingBlocks) {
commitKey(blockData, kvContainer);
}
}
private long commitKey(BlockData blockData, KeyValueContainer kvContainer)
throws IOException {
Preconditions.checkNotNull(blockData);
long length = blockManager.putBlock(kvContainer, blockData);
//update the open key Map in containerManager
this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID());
return length;
}
/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
@ -513,11 +476,6 @@ ContainerCommandResponseProto handleGetCommittedBlockLength(
try {
BlockID blockID = BlockID
.getFromProtobuf(request.getGetCommittedBlockLength().getBlockID());
// Check if it really exists in the openContainerBlockMap
if (openContainerBlockMap.checkIfBlockExists(blockID)) {
String msg = "Block " + blockID + " is not committed yet.";
throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED);
}
blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -617,7 +575,6 @@ ContainerCommandResponseProto handleDeleteChunk(
Preconditions.checkNotNull(chunkInfo);
chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
openContainerBlockMap.removeChunk(blockID, chunkInfoProto);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
@ -666,13 +623,6 @@ ContainerCommandResponseProto handleWriteChunk(
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen());
}
if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA
|| request.getWriteChunk().getStage() == Stage.COMBINED) {
// the openContainerBlockMap should be updated only during
// COMMIT_STAGE of handling write chunk request.
openContainerBlockMap.addChunk(blockID, chunkInfoProto);
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {

View File

@ -1,261 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Assert;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.LinkedList;
import static org.apache.hadoop.ozone.container.ContainerTestHelper
.createSingleNodePipeline;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
import static org.apache.hadoop.ozone.container.ContainerTestHelper
.setDataChecksum;
/**
* Simple tests to verify that closeContainer handler on Datanode.
*/
public class TestCloseContainerHandler {
@Rule
public TestRule timeout = new Timeout(300000);
private static Configuration conf;
private static HddsDispatcher dispatcher;
private static ContainerSet containerSet;
private static VolumeSet volumeSet;
private static KeyValueHandler handler;
private static OpenContainerBlockMap openContainerBlockMap;
private final static String DATANODE_UUID = UUID.randomUUID().toString();
private static final String BASE_DIR = MiniDFSCluster.getBaseDirectory();
private static final String VOLUME_1 = BASE_DIR + "disk1";
private static final String VOLUME_2 = BASE_DIR + "disk2";
@BeforeClass
public static void setup() throws Exception {
conf = new Configuration();
String dataDirKey = VOLUME_1 + "," + VOLUME_2;
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
containerSet = new ContainerSet();
DatanodeDetails datanodeDetails =
DatanodeDetails.newBuilder().setUuid(DATANODE_UUID)
.setHostName("localhost").setIpAddress("127.0.0.1").build();
volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null);
handler = (KeyValueHandler) dispatcher
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
openContainerBlockMap = handler.getOpenContainerBlockMap();
dispatcher.setScmId(UUID.randomUUID().toString());
}
@AfterClass
public static void shutdown() throws IOException {
// Delete the hdds volume root dir
List<HddsVolume> volumes = new ArrayList<>();
volumes.addAll(volumeSet.getVolumesList());
volumes.addAll(volumeSet.getFailedVolumesList());
for (HddsVolume volume : volumes) {
FileUtils.deleteDirectory(volume.getHddsRootDir());
}
volumeSet.shutdown();
}
private long createContainer() {
long testContainerId = ContainerTestHelper.getTestContainerID();
ContainerProtos.ContainerCommandRequestProto request =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CreateContainer)
.setContainerID(testContainerId)
.setDatanodeUuid(DATANODE_UUID)
.setCreateContainer(ContainerProtos.CreateContainerRequestProto
.getDefaultInstance())
.build();
dispatcher.dispatch(request);
return testContainerId;
}
private List<ChunkInfo> writeChunkBuilder(BlockID blockID, Pipeline pipeline,
int chunkCount)
throws IOException, NoSuchAlgorithmException {
final int datalen = 1024;
long testContainerID = blockID.getContainerID();
List<ChunkInfo> chunkList = new LinkedList<>();
for (int x = 0; x < chunkCount; x++) {
ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto.newBuilder();
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
writeRequest.setChunkData(info.getProtoBufMessage());
writeRequest.setData(ByteString.copyFrom(data));
writeRequest.setStage(ContainerProtos.Stage.COMBINED);
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.WriteChunk);
request.setContainerID(blockID.getContainerID());
request.setWriteChunk(writeRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
dispatcher.dispatch(request.build());
chunkList.add(info);
}
return chunkList;
}
@Test
public void testPutKeyWithMultipleChunks()
throws IOException, NoSuchAlgorithmException {
long testContainerID = createContainer();
Assert.assertNotNull(containerSet.getContainer(testContainerID));
BlockID blockID = ContainerTestHelper.
getTestBlockID(testContainerID);
Pipeline pipeline = createSingleNodePipeline();
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
// the block should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
for (ChunkInfo i : chunkList) {
chunkProtoList.add(i.getProtoBufMessage());
}
blockData.setChunks(chunkProtoList);
ContainerProtos.PutBlockRequestProto.Builder putBlockRequestProto =
ContainerProtos.PutBlockRequestProto.newBuilder();
putBlockRequestProto.setBlockData(blockData.getProtoBufMessage());
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutBlock);
request.setContainerID(blockID.getContainerID());
request.setPutBlock(putBlockRequestProto);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
dispatcher.dispatch(request.build());
//the open block should be removed from Map
Assert.assertNull(
openContainerBlockMap.getBlockDataMap(testContainerID));
}
@Test
public void testDeleteChunk() throws Exception {
long testContainerID = createContainer();
Assert.assertNotNull(containerSet.getContainer(testContainerID));
BlockID blockID = ContainerTestHelper.
getTestBlockID(testContainerID);
Pipeline pipeline = createSingleNodePipeline();
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
// the key should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
Assert.assertTrue(
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()).getChunks().size() == 3);
ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
ContainerProtos.DeleteChunkRequestProto.newBuilder();
deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf());
deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage());
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto.newBuilder();
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage());
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.DeleteChunk);
request.setContainerID(blockID.getContainerID());
request.setDeleteChunk(deleteChunkProto);
request.setWriteChunk(writeRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
dispatcher.dispatch(request.build());
Assert.assertTrue(
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()).getChunks().size() == 2);
}
@Test
public void testCloseContainer() throws Exception {
long testContainerID = createContainer();
Assert.assertNotNull(containerSet.getContainer(testContainerID));
BlockID blockID = ContainerTestHelper.
getTestBlockID(testContainerID);
Pipeline pipeline = createSingleNodePipeline();
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
Container container = containerSet.getContainer(testContainerID);
BlockData blockData = openContainerBlockMap.
getBlockDataMap(testContainerID).get(blockID.getLocalID());
// the key should exist in the map
Assert.assertNotNull(
openContainerBlockMap.getBlockDataMap(testContainerID)
.get(blockID.getLocalID()));
Assert.assertTrue(
blockData.getChunks().size() == chunkList.size());
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(blockID.getContainerID());
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
dispatcher.dispatch(request.build());
Assert.assertNull(
openContainerBlockMap.getBlockDataMap(testContainerID));
// Make sure the key got committed
Assert.assertNotNull(handler.getBlockManager()
.getBlock(container, blockID));
}
}

View File

@ -117,42 +117,6 @@ public void tesGetCommittedBlockLength() throws Exception {
xceiverClientManager.releaseClient(client);
}
@Test
public void tesGetCommittedBlockLengthWithClosedContainer()
throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client =
xceiverClientManager.acquireClient(pipeline);
// create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID);
byte[] data =
RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes();
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper
.getWriteChunkRequest(container.getPipeline(), blockID,
data.length);
client.sendCommand(writeChunkRequest);
// close the container
ContainerProtocolCalls.closeContainer(client, containerID, traceID);
ContainerProtos.GetCommittedBlockLengthResponseProto response =
ContainerProtocolCalls
.getCommittedBlockLength(client, blockID, traceID);
// make sure the block ids in the request and response are same.
// This will also ensure that closing the container committed the block
// on the Datanodes.
Assert.assertTrue(
BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
Assert.assertTrue(response.getBlockLength() == data.length);
xceiverClientManager.releaseClient(client);
}
@Test
public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
String traceID = UUID.randomUUID().toString();
@ -178,41 +142,6 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
xceiverClientManager.releaseClient(client);
}
@Test
public void testGetCommittedBlockLengthForOpenBlock() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls
.createContainer(client, containerID, traceID);
BlockID blockID =
ContainerTestHelper.getTestBlockID(containerID);
ContainerProtos.ContainerCommandRequestProto requestProto =
ContainerTestHelper
.getWriteChunkRequest(container.getPipeline(), blockID, 1024);
client.sendCommand(requestProto);
try {
ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
Assert.fail("Expected Exception not thrown");
} catch (StorageContainerException sce) {
Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED,
sce.getResult());
}
// now close the container, it should auto commit pending open blocks
ContainerProtocolCalls
.closeContainer(client, containerID, traceID);
ContainerProtos.GetCommittedBlockLengthResponseProto response =
ContainerProtocolCalls
.getCommittedBlockLength(client, blockID, traceID);
Assert.assertTrue(response.getBlockLength() == 1024);
xceiverClientManager.releaseClient(client);
}
@Test
public void tesPutKeyResposne() throws Exception {
ContainerProtos.PutBlockResponseProto response;