From b57cc73f837ecb79ed275fc6e50ffce684baf573 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 14 Nov 2018 20:05:56 +0530 Subject: [PATCH] HDDS-774. Remove OpenContainerBlockMap from datanode. Contributed by Shashikant Banerjee. --- .../container/keyvalue/KeyValueHandler.java | 54 +--- .../impl/TestCloseContainerHandler.java | 261 ------------------ .../TestGetCommittedBlockLengthAndPutKey.java | 71 ----- 3 files changed, 2 insertions(+), 384 deletions(-) delete mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index d8c23bf5b0..f970c72ee5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -54,7 +54,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; @@ -109,7 +108,6 @@ public class KeyValueHandler extends Handler { private final VolumeChoosingPolicy volumeChoosingPolicy; private final long maxContainerSize; private final AutoCloseableLock handlerLock; - private final OpenContainerBlockMap openContainerBlockMap; public KeyValueHandler(Configuration config, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { @@ -138,21 +136,12 @@ public KeyValueHandler(Configuration config, ContainerSet contSet, // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. handlerLock = new AutoCloseableLock(new ReentrantLock(true)); - openContainerBlockMap = new OpenContainerBlockMap(); } @VisibleForTesting public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { return volumeChoosingPolicy; } - /** - * Returns OpenContainerBlockMap instance. - * - * @return OpenContainerBlockMap - */ - public OpenContainerBlockMap getOpenContainerBlockMap() { - return openContainerBlockMap; - } @Override public ContainerCommandResponseProto handle( @@ -355,7 +344,6 @@ ContainerCommandResponseProto handleDeleteContainer( } else { long containerId = kvContainer.getContainerData().getContainerID(); containerSet.removeContainer(containerId); - openContainerBlockMap.removeContainer(containerId); // Release the lock first. // Avoid holding write locks for disk operations kvContainer.writeUnlock(); @@ -388,19 +376,11 @@ ContainerCommandResponseProto handleCloseContainer( long containerID = kvContainer.getContainerData().getContainerID(); try { checkContainerOpen(kvContainer); - KeyValueContainerData kvData = kvContainer.getContainerData(); - - // remove the container from open block map once, all the blocks - // have been committed and the container is closed - commitPendingBlocks(kvContainer); - // TODO : The close command should move the container to either quasi // closed/closed depending upon how the closeContainer gets executed. // If it arrives by Standalone, it will be moved to Quasi Closed or // otherwise moved to Closed state if it gets executed via Ratis. kvContainer.close(); - // make sure the the container open keys from BlockMap gets removed - openContainerBlockMap.removeContainer(kvData.getContainerID()); } catch (StorageContainerException ex) { if (ex.getResult() == CLOSED_CONTAINER_IO) { LOG.debug("Container {} is already closed.", containerID); @@ -434,8 +414,9 @@ ContainerCommandResponseProto handlePutBlock( BlockData blockData = BlockData.getFromProtoBuf( request.getPutBlock().getBlockData()); + Preconditions.checkNotNull(blockData); long numBytes = blockData.getProtoBufMessage().toByteArray().length; - blockLength = commitKey(blockData, kvContainer); + blockLength = blockManager.putBlock(kvContainer, blockData); metrics.incContainerBytesStats(Type.PutBlock, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -448,24 +429,6 @@ ContainerCommandResponseProto handlePutBlock( return BlockUtils.putBlockResponseSuccess(request, blockLength); } - private void commitPendingBlocks(KeyValueContainer kvContainer) - throws IOException { - long containerId = kvContainer.getContainerData().getContainerID(); - List pendingBlocks = - this.openContainerBlockMap.getOpenBlocks(containerId); - for(BlockData blockData : pendingBlocks) { - commitKey(blockData, kvContainer); - } - } - - private long commitKey(BlockData blockData, KeyValueContainer kvContainer) - throws IOException { - Preconditions.checkNotNull(blockData); - long length = blockManager.putBlock(kvContainer, blockData); - //update the open key Map in containerManager - this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID()); - return length; - } /** * Handle Get Block operation. Calls BlockManager to process the request. */ @@ -513,11 +476,6 @@ ContainerCommandResponseProto handleGetCommittedBlockLength( try { BlockID blockID = BlockID .getFromProtobuf(request.getGetCommittedBlockLength().getBlockID()); - // Check if it really exists in the openContainerBlockMap - if (openContainerBlockMap.checkIfBlockExists(blockID)) { - String msg = "Block " + blockID + " is not committed yet."; - throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED); - } blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -617,7 +575,6 @@ ContainerCommandResponseProto handleDeleteChunk( Preconditions.checkNotNull(chunkInfo); chunkManager.deleteChunk(kvContainer, blockID, chunkInfo); - openContainerBlockMap.removeChunk(blockID, chunkInfoProto); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -666,13 +623,6 @@ ContainerCommandResponseProto handleWriteChunk( metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() .getChunkData().getLen()); } - - if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA - || request.getWriteChunk().getStage() == Stage.COMBINED) { - // the openContainerBlockMap should be updated only during - // COMMIT_STAGE of handling write chunk request. - openContainerBlockMap.addChunk(blockID, chunkInfoProto); - } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java deleted file mode 100644 index 0ae63e3101..0000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.container.common.impl; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.container.common.helpers.BlockData; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; -import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.Assert; -import org.junit.rules.TestRule; -import org.junit.rules.Timeout; - -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.LinkedList; -import static org.apache.hadoop.ozone.container.ContainerTestHelper - .createSingleNodePipeline; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; -import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; -import static org.apache.hadoop.ozone.container.ContainerTestHelper - .setDataChecksum; - -/** - * Simple tests to verify that closeContainer handler on Datanode. - */ -public class TestCloseContainerHandler { - - @Rule - public TestRule timeout = new Timeout(300000); - - private static Configuration conf; - private static HddsDispatcher dispatcher; - private static ContainerSet containerSet; - private static VolumeSet volumeSet; - private static KeyValueHandler handler; - private static OpenContainerBlockMap openContainerBlockMap; - - private final static String DATANODE_UUID = UUID.randomUUID().toString(); - - private static final String BASE_DIR = MiniDFSCluster.getBaseDirectory(); - private static final String VOLUME_1 = BASE_DIR + "disk1"; - private static final String VOLUME_2 = BASE_DIR + "disk2"; - - @BeforeClass - public static void setup() throws Exception { - conf = new Configuration(); - String dataDirKey = VOLUME_1 + "," + VOLUME_2; - conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey); - containerSet = new ContainerSet(); - DatanodeDetails datanodeDetails = - DatanodeDetails.newBuilder().setUuid(DATANODE_UUID) - .setHostName("localhost").setIpAddress("127.0.0.1").build(); - volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); - - dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null); - handler = (KeyValueHandler) dispatcher - .getHandler(ContainerProtos.ContainerType.KeyValueContainer); - openContainerBlockMap = handler.getOpenContainerBlockMap(); - dispatcher.setScmId(UUID.randomUUID().toString()); - } - - @AfterClass - public static void shutdown() throws IOException { - // Delete the hdds volume root dir - List volumes = new ArrayList<>(); - volumes.addAll(volumeSet.getVolumesList()); - volumes.addAll(volumeSet.getFailedVolumesList()); - - for (HddsVolume volume : volumes) { - FileUtils.deleteDirectory(volume.getHddsRootDir()); - } - volumeSet.shutdown(); - } - - private long createContainer() { - long testContainerId = ContainerTestHelper.getTestContainerID(); - - ContainerProtos.ContainerCommandRequestProto request = - ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.CreateContainer) - .setContainerID(testContainerId) - .setDatanodeUuid(DATANODE_UUID) - .setCreateContainer(ContainerProtos.CreateContainerRequestProto - .getDefaultInstance()) - .build(); - - dispatcher.dispatch(request); - return testContainerId; - } - - private List writeChunkBuilder(BlockID blockID, Pipeline pipeline, - int chunkCount) - throws IOException, NoSuchAlgorithmException { - final int datalen = 1024; - long testContainerID = blockID.getContainerID(); - List chunkList = new LinkedList<>(); - for (int x = 0; x < chunkCount; x++) { - ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen); - byte[] data = getData(datalen); - setDataChecksum(info, data); - ContainerProtos.WriteChunkRequestProto.Builder writeRequest = - ContainerProtos.WriteChunkRequestProto.newBuilder(); - writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); - writeRequest.setChunkData(info.getProtoBufMessage()); - writeRequest.setData(ByteString.copyFrom(data)); - writeRequest.setStage(ContainerProtos.Stage.COMBINED); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.WriteChunk); - request.setContainerID(blockID.getContainerID()); - request.setWriteChunk(writeRequest); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); - dispatcher.dispatch(request.build()); - chunkList.add(info); - } - return chunkList; - } - - @Test - public void testPutKeyWithMultipleChunks() - throws IOException, NoSuchAlgorithmException { - long testContainerID = createContainer(); - Assert.assertNotNull(containerSet.getContainer(testContainerID)); - BlockID blockID = ContainerTestHelper. - getTestBlockID(testContainerID); - Pipeline pipeline = createSingleNodePipeline(); - List chunkList = writeChunkBuilder(blockID, pipeline, 3); - // the block should exist in the map - Assert.assertNotNull( - openContainerBlockMap.getBlockDataMap(testContainerID) - .get(blockID.getLocalID())); - BlockData blockData = new BlockData(blockID); - List chunkProtoList = new LinkedList<>(); - for (ChunkInfo i : chunkList) { - chunkProtoList.add(i.getProtoBufMessage()); - } - blockData.setChunks(chunkProtoList); - ContainerProtos.PutBlockRequestProto.Builder putBlockRequestProto = - ContainerProtos.PutBlockRequestProto.newBuilder(); - putBlockRequestProto.setBlockData(blockData.getProtoBufMessage()); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.PutBlock); - request.setContainerID(blockID.getContainerID()); - request.setPutBlock(putBlockRequestProto); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); - dispatcher.dispatch(request.build()); - - //the open block should be removed from Map - Assert.assertNull( - openContainerBlockMap.getBlockDataMap(testContainerID)); - } - - @Test - public void testDeleteChunk() throws Exception { - long testContainerID = createContainer(); - Assert.assertNotNull(containerSet.getContainer(testContainerID)); - BlockID blockID = ContainerTestHelper. - getTestBlockID(testContainerID); - Pipeline pipeline = createSingleNodePipeline(); - List chunkList = writeChunkBuilder(blockID, pipeline, 3); - // the key should exist in the map - Assert.assertNotNull( - openContainerBlockMap.getBlockDataMap(testContainerID) - .get(blockID.getLocalID())); - Assert.assertTrue( - openContainerBlockMap.getBlockDataMap(testContainerID) - .get(blockID.getLocalID()).getChunks().size() == 3); - ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto = - ContainerProtos.DeleteChunkRequestProto.newBuilder(); - deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf()); - deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage()); - ContainerProtos.WriteChunkRequestProto.Builder writeRequest = - ContainerProtos.WriteChunkRequestProto.newBuilder(); - writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); - writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage()); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.DeleteChunk); - request.setContainerID(blockID.getContainerID()); - request.setDeleteChunk(deleteChunkProto); - request.setWriteChunk(writeRequest); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); - dispatcher.dispatch(request.build()); - Assert.assertTrue( - openContainerBlockMap.getBlockDataMap(testContainerID) - .get(blockID.getLocalID()).getChunks().size() == 2); - - } - - @Test - public void testCloseContainer() throws Exception { - long testContainerID = createContainer(); - Assert.assertNotNull(containerSet.getContainer(testContainerID)); - BlockID blockID = ContainerTestHelper. - getTestBlockID(testContainerID); - Pipeline pipeline = createSingleNodePipeline(); - List chunkList = writeChunkBuilder(blockID, pipeline, 3); - - Container container = containerSet.getContainer(testContainerID); - BlockData blockData = openContainerBlockMap. - getBlockDataMap(testContainerID).get(blockID.getLocalID()); - // the key should exist in the map - Assert.assertNotNull( - openContainerBlockMap.getBlockDataMap(testContainerID) - .get(blockID.getLocalID())); - Assert.assertTrue( - blockData.getChunks().size() == chunkList.size()); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(blockID.getContainerID()); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); - dispatcher.dispatch(request.build()); - Assert.assertNull( - openContainerBlockMap.getBlockDataMap(testContainerID)); - // Make sure the key got committed - Assert.assertNotNull(handler.getBlockManager() - .getBlock(container, blockID)); - } -} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 974bb97f05..b4601fa4d0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -117,42 +117,6 @@ public void tesGetCommittedBlockLength() throws Exception { xceiverClientManager.releaseClient(client); } - @Test - public void tesGetCommittedBlockLengthWithClosedContainer() - throws Exception { - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - Pipeline pipeline = container.getPipeline(); - XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline); - // create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); - - byte[] data = - RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); - BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); - ContainerProtos.ContainerCommandRequestProto writeChunkRequest = - ContainerTestHelper - .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); - client.sendCommand(writeChunkRequest); - // close the container - ContainerProtocolCalls.closeContainer(client, containerID, traceID); - ContainerProtos.GetCommittedBlockLengthResponseProto response = - ContainerProtocolCalls - .getCommittedBlockLength(client, blockID, traceID); - // make sure the block ids in the request and response are same. - // This will also ensure that closing the container committed the block - // on the Datanodes. - Assert.assertTrue( - BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); - Assert.assertTrue(response.getBlockLength() == data.length); - xceiverClientManager.releaseClient(client); - } - @Test public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { String traceID = UUID.randomUUID().toString(); @@ -178,41 +142,6 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { xceiverClientManager.releaseClient(client); } - @Test - public void testGetCommittedBlockLengthForOpenBlock() throws Exception { - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline()); - ContainerProtocolCalls - .createContainer(client, containerID, traceID); - - BlockID blockID = - ContainerTestHelper.getTestBlockID(containerID); - ContainerProtos.ContainerCommandRequestProto requestProto = - ContainerTestHelper - .getWriteChunkRequest(container.getPipeline(), blockID, 1024); - client.sendCommand(requestProto); - try { - ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); - Assert.fail("Expected Exception not thrown"); - } catch (StorageContainerException sce) { - Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED, - sce.getResult()); - } - // now close the container, it should auto commit pending open blocks - ContainerProtocolCalls - .closeContainer(client, containerID, traceID); - ContainerProtos.GetCommittedBlockLengthResponseProto response = - ContainerProtocolCalls - .getCommittedBlockLength(client, blockID, traceID); - Assert.assertTrue(response.getBlockLength() == 1024); - xceiverClientManager.releaseClient(client); - } - @Test public void tesPutKeyResposne() throws Exception { ContainerProtos.PutBlockResponseProto response;