From bbe2f6225ea500651de04c064f7b847be18e5b66 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Mon, 23 Jul 2018 09:12:47 +0530 Subject: [PATCH] HDDS-181. CloseContainer should commit all pending open Keys on a datanode. Contributed by Shashikant Banerjee. --- .../container/common/helpers/KeyData.java | 20 +- .../common/impl/OpenContainerBlockMap.java | 167 +++++++++++ .../container/keyvalue/KeyValueHandler.java | 69 ++++- .../impl/TestCloseContainerHandler.java | 260 ++++++++++++++++++ 4 files changed, 504 insertions(+), 12 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java index 129e4a8fed..b63332fa10 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.ArrayList; /** * Helper class to convert Protobuf to Java classes. @@ -130,8 +131,26 @@ public List getChunks() { return chunks; } + /** + * Adds chinkInfo to the list + */ + public void addChunk(ContainerProtos.ChunkInfo chunkInfo) { + if (chunks == null) { + chunks = new ArrayList<>(); + } + chunks.add(chunkInfo); + } + + /** + * removes the chunk. + */ + public void removeChunk(ContainerProtos.ChunkInfo chunkInfo) { + chunks.remove(chunkInfo); + } + /** * Returns container ID. + * * @return long. */ public long getContainerID() { @@ -170,5 +189,4 @@ public void setChunks(List chunks) { public long getSize() { return chunks.parallelStream().mapToLong(e->e.getLen()).sum(); } - } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java new file mode 100644 index 0000000000..ab5f861e58 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * This class will maintain list of open keys per container when closeContainer + * command comes, it should autocommit all open keys of a open container before + * marking the container as closed. + */ +public class OpenContainerBlockMap { + + /** + * TODO : We may construct the openBlockMap by reading the Block Layout + * for each block inside a container listing all chunk files and reading the + * sizes. This will help to recreate the openKeys Map once the DataNode + * restarts. + * + * For now, we will track all open blocks of a container in the blockMap. + */ + private final ConcurrentHashMap> + openContainerBlockMap; + + /** + * Constructs OpenContainerBlockMap. + */ + public OpenContainerBlockMap() { + openContainerBlockMap = new ConcurrentHashMap<>(); + } + /** + * Removes the Container matching with specified containerId. + * @param containerId containerId + */ + public void removeContainer(long containerId) { + Preconditions + .checkState(containerId >= 0, "Container Id cannot be negative."); + openContainerBlockMap.computeIfPresent(containerId, (k, v) -> null); + } + + /** + * updates the chunkInfoList in case chunk is added or deleted + * @param blockID id of the block. + * @param info - Chunk Info + * @param remove if true, deletes the chunkInfo list otherwise appends to the + * chunkInfo List + * @throws IOException + */ + public synchronized void updateOpenKeyMap(BlockID blockID, + ContainerProtos.ChunkInfo info, boolean remove) throws IOException { + if (remove) { + deleteChunkFromMap(blockID, info); + } else { + addChunkToMap(blockID, info); + } + } + + private KeyData getKeyData(ContainerProtos.ChunkInfo info, BlockID blockID) + throws IOException { + KeyData keyData = new KeyData(blockID); + keyData.addMetadata("TYPE", "KEY"); + keyData.addChunk(info); + return keyData; + } + + private void addChunkToMap(BlockID blockID, ContainerProtos.ChunkInfo info) + throws IOException { + Preconditions.checkNotNull(info); + long containerId = blockID.getContainerID(); + long localID = blockID.getLocalID(); + + KeyData keyData = openContainerBlockMap.computeIfAbsent(containerId, + emptyMap -> new LinkedHashMap()) + .putIfAbsent(localID, getKeyData(info, blockID)); + // KeyData != null means the block already exist + if (keyData != null) { + HashMap keyDataSet = + openContainerBlockMap.get(containerId); + keyDataSet.putIfAbsent(blockID.getLocalID(), getKeyData(info, blockID)); + keyDataSet.computeIfPresent(blockID.getLocalID(), (key, value) -> { + value.addChunk(info); + return value; + }); + } + } + + /** + * removes the chunks from the chunkInfo list for the given block. + * @param blockID id of the block + * @param chunkInfo chunk info. + */ + private synchronized void deleteChunkFromMap(BlockID blockID, + ContainerProtos.ChunkInfo chunkInfo) { + Preconditions.checkNotNull(chunkInfo); + Preconditions.checkNotNull(blockID); + HashMap keyDataMap = + openContainerBlockMap.get(blockID.getContainerID()); + if (keyDataMap != null) { + long localId = blockID.getLocalID(); + KeyData keyData = keyDataMap.get(localId); + if (keyData != null) { + keyData.removeChunk(chunkInfo); + } + } + } + + /** + * returns the list of open to the openContainerBlockMap + * @param containerId container id + * @return List of open Keys(blocks) + */ + public List getOpenKeys(long containerId) { + HashMap keyDataHashMap = + openContainerBlockMap.get(containerId); + return keyDataHashMap == null ? null : + keyDataHashMap.values().stream().collect(Collectors.toList()); + } + + /** + * removes the block from the block map. + * @param blockID + */ + public synchronized void removeFromKeyMap(BlockID blockID) { + Preconditions.checkNotNull(blockID); + HashMap keyDataMap = + openContainerBlockMap.get(blockID.getContainerID()); + if (keyDataMap != null) { + keyDataMap.remove(blockID.getLocalID()); + if (keyDataMap.size() == 0) { + removeContainer(blockID.getContainerID()); + } + } + } + + @VisibleForTesting + public ConcurrentHashMap> getContainerOpenKeyMap() { + return openContainerBlockMap; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 84b3644970..9aa3df7a36 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -45,6 +45,7 @@ .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; @@ -117,7 +118,7 @@ public class KeyValueHandler extends Handler { private VolumeChoosingPolicy volumeChoosingPolicy; private final int maxContainerSizeGB; private final AutoCloseableLock handlerLock; - + private final OpenContainerBlockMap openContainerBlockMap; public KeyValueHandler(Configuration config, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { @@ -145,6 +146,15 @@ public KeyValueHandler(Configuration config, ContainerSet contSet, // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. handlerLock = new AutoCloseableLock(new ReentrantLock(true)); + openContainerBlockMap = new OpenContainerBlockMap(); + } + + /** + * Returns OpenContainerBlockMap instance + * @return OpenContainerBlockMap + */ + public OpenContainerBlockMap getOpenContainerBlockMap() { + return openContainerBlockMap; } @Override @@ -333,8 +343,9 @@ ContainerCommandResponseProto handleDeleteContainer( "Container cannot be deleted because it is not empty.", ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY); } else { - containerSet.removeContainer( - kvContainer.getContainerData().getContainerID()); + long containerId = kvContainer.getContainerData().getContainerID(); + containerSet.removeContainer(containerId); + openContainerBlockMap.removeContainer(containerId); // Release the lock first. // Avoid holding write locks for disk operations kvContainer.writeUnlock(); @@ -366,9 +377,21 @@ ContainerCommandResponseProto handleCloseContainer( try { checkContainerOpen(kvContainer); + // remove the container from open block map once, all the blocks + // have been committed and the container is closed + kvContainer.getContainerData() + .setState(ContainerProtos.ContainerLifeCycleState.CLOSING); + commitPendingKeys(kvContainer); kvContainer.close(); + // make sure the the container open keys from BlockMap gets removed + openContainerBlockMap.removeContainer( + request.getCloseContainer().getContainerID()); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ex) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException("Close Container failed", ex, + IO_EXCEPTION), request); } return ContainerUtils.getSuccessResponse(request); @@ -391,10 +414,8 @@ ContainerCommandResponseProto handlePutKey( KeyData keyData = KeyData.getFromProtoBuf( request.getPutKey().getKeyData()); - Preconditions.checkNotNull(keyData); - - keyManager.putKey(kvContainer, keyData); long numBytes = keyData.getProtoBufMessage().toByteArray().length; + commitKey(keyData, kvContainer); metrics.incContainerBytesStats(Type.PutKey, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -407,6 +428,25 @@ ContainerCommandResponseProto handlePutKey( return KeyUtils.getKeyResponseSuccess(request); } + private void commitPendingKeys(KeyValueContainer kvContainer) + throws IOException { + long containerId = kvContainer.getContainerData().getContainerID(); + List pendingKeys = + this.openContainerBlockMap.getOpenKeys(containerId); + if (pendingKeys != null) { + for (KeyData keyData : pendingKeys) { + commitKey(keyData, kvContainer); + } + } + } + + private void commitKey(KeyData keyData, KeyValueContainer kvContainer) + throws IOException { + Preconditions.checkNotNull(keyData); + keyManager.putKey(kvContainer, keyData); + //update the open key Map in containerManager + this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); + } /** * Handle Get Key operation. Calls KeyManager to process the request. */ @@ -519,11 +559,13 @@ ContainerCommandResponseProto handleDeleteChunk( BlockID blockID = BlockID.getFromProtobuf( request.getDeleteChunk().getBlockID()); - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk() - .getChunkData()); + ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk() + .getChunkData(); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); Preconditions.checkNotNull(chunkInfo); chunkManager.deleteChunk(kvContainer, blockID, chunkInfo); + openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, true); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -552,8 +594,9 @@ ContainerCommandResponseProto handleWriteChunk( BlockID blockID = BlockID.getFromProtobuf( request.getWriteChunk().getBlockID()); - ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk() - .getChunkData()); + ContainerProtos.ChunkInfo chunkInfoProto = + request.getWriteChunk().getChunkData(); + ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); Preconditions.checkNotNull(chunkInfo); byte[] data = null; @@ -570,6 +613,9 @@ ContainerCommandResponseProto handleWriteChunk( request.getWriteChunk().getStage() == Stage.COMBINED) { metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() .getChunkData().getLen()); + // the openContainerBlockMap should be updated only while writing data + // not during COMMIT_STAGE of handling write chunk request. + openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, false); } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -610,8 +656,9 @@ ContainerCommandResponseProto handlePutSmallFile( ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( putSmallFileReq.getChunkInfo()); Preconditions.checkNotNull(chunkInfo); - byte[] data = putSmallFileReq.getData().toByteArray(); + // chunks will be committed as a part of handling putSmallFile + // here. There is no need to maintain this info in openContainerBlockMap. chunkManager.writeChunk( kvContainer, blockID, chunkInfo, data, Stage.COMBINED); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java new file mode 100644 index 0000000000..3ab593e6dd --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.impl; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.Assert; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.LinkedList; +import static org.apache.hadoop.ozone.container.ContainerTestHelper + .createSingleNodePipeline; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; +import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; +import static org.apache.hadoop.ozone.container.ContainerTestHelper + .setDataChecksum; + +/** + * Simple tests to verify that closeContainer handler on Datanode. + */ +public class TestCloseContainerHandler { + + @Rule + public TestRule timeout = new Timeout(300000); + + private static Configuration conf; + private static HddsDispatcher dispatcher; + private static ContainerSet containerSet; + private static VolumeSet volumeSet; + private static KeyValueHandler handler; + private static OpenContainerBlockMap openContainerBlockMap; + + private final static String DATANODE_UUID = UUID.randomUUID().toString(); + + private static final String baseDir = MiniDFSCluster.getBaseDirectory(); + private static final String volume1 = baseDir + "disk1"; + private static final String volume2 = baseDir + "disk2"; + + @BeforeClass + public static void setup() throws Exception { + conf = new Configuration(); + String dataDirKey = volume1 + "," + volume2; + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey); + containerSet = new ContainerSet(); + DatanodeDetails datanodeDetails = + DatanodeDetails.newBuilder().setUuid(DATANODE_UUID) + .setHostName("localhost").setIpAddress("127.0.0.1").build(); + volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); + + dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); + handler = (KeyValueHandler) dispatcher + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + openContainerBlockMap = handler.getOpenContainerBlockMap(); + dispatcher.setScmId(UUID.randomUUID().toString()); + } + + @AfterClass + public static void shutdown() throws IOException { + // Delete the hdds volume root dir + List volumes = new ArrayList<>(); + volumes.addAll(volumeSet.getVolumesList()); + volumes.addAll(volumeSet.getFailedVolumesList()); + + for (HddsVolume volume : volumes) { + FileUtils.deleteDirectory(volume.getHddsRootDir()); + } + volumeSet.shutdown(); + } + + private long createContainer() { + long testContainerId = ContainerTestHelper.getTestContainerID(); + ContainerProtos.CreateContainerRequestProto createReq = + ContainerProtos.CreateContainerRequestProto.newBuilder() + .setContainerID(testContainerId) + .build(); + + ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.CreateContainer) + .setDatanodeUuid(DATANODE_UUID) + .setCreateContainer(createReq) + .build(); + + dispatcher.dispatch(request); + return testContainerId; + } + + private List writeChunkBuilder(BlockID blockID, Pipeline pipeline, + int chunkCount) + throws IOException, NoSuchAlgorithmException { + final int datalen = 1024; + long testContainerID = blockID.getContainerID(); + List chunkList = new LinkedList<>(); + for (int x = 0; x < chunkCount; x++) { + ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen); + byte[] data = getData(datalen); + setDataChecksum(info, data); + ContainerProtos.WriteChunkRequestProto.Builder writeRequest = + ContainerProtos.WriteChunkRequestProto.newBuilder(); + writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); + writeRequest.setChunkData(info.getProtoBufMessage()); + writeRequest.setData(ByteString.copyFrom(data)); + writeRequest.setStage(ContainerProtos.Stage.COMBINED); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.WriteChunk); + request.setWriteChunk(writeRequest); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); + dispatcher.dispatch(request.build()); + chunkList.add(info); + } + return chunkList; + } + + @Test + public void testPutKeyWithMultipleChunks() + throws IOException, NoSuchAlgorithmException { + long testContainerID = createContainer(); + Assert.assertNotNull(containerSet.getContainer(testContainerID)); + BlockID blockID = ContainerTestHelper. + getTestBlockID(testContainerID); + Pipeline pipeline = createSingleNodePipeline(); + List chunkList = writeChunkBuilder(blockID, pipeline, 3); + // the key should exist in the map + Assert.assertTrue( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID) + .containsKey(blockID.getLocalID())); + KeyData keyData = new KeyData(blockID); + List chunkProtoList = new LinkedList<>(); + for (ChunkInfo i : chunkList) { + chunkProtoList.add(i.getProtoBufMessage()); + } + keyData.setChunks(chunkProtoList); + ContainerProtos.PutKeyRequestProto.Builder putKeyRequestProto = + ContainerProtos.PutKeyRequestProto.newBuilder(); + putKeyRequestProto.setKeyData(keyData.getProtoBufMessage()); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.PutKey); + request.setPutKey(putKeyRequestProto); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); + dispatcher.dispatch(request.build()); + + //the open key should be removed from Map + Assert.assertNull( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)); + } + + @Test + public void testDeleteChunk() throws Exception { + long testContainerID = createContainer(); + Assert.assertNotNull(containerSet.getContainer(testContainerID)); + BlockID blockID = ContainerTestHelper. + getTestBlockID(testContainerID); + Pipeline pipeline = createSingleNodePipeline(); + List chunkList = writeChunkBuilder(blockID, pipeline, 3); + // the key should exist in the map + Assert.assertTrue( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID) + .containsKey(blockID.getLocalID())); + Assert.assertTrue( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID) + .get(blockID.getLocalID()).getChunks().size() == 3); + ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto = + ContainerProtos.DeleteChunkRequestProto.newBuilder(); + deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf()); + deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage()); + ContainerProtos.WriteChunkRequestProto.Builder writeRequest = + ContainerProtos.WriteChunkRequestProto.newBuilder(); + writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); + writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage()); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.DeleteChunk); + request.setDeleteChunk(deleteChunkProto); + request.setWriteChunk(writeRequest); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); + dispatcher.dispatch(request.build()); + Assert.assertTrue( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID) + .get(blockID.getLocalID()).getChunks().size() == 2); + + } + + @Test + public void testCloseContainer() throws Exception { + long testContainerID = createContainer(); + Assert.assertNotNull(containerSet.getContainer(testContainerID)); + BlockID blockID = ContainerTestHelper. + getTestBlockID(testContainerID); + Pipeline pipeline = createSingleNodePipeline(); + List chunkList = writeChunkBuilder(blockID, pipeline, 3); + + Container container = containerSet.getContainer(testContainerID); + KeyData keyData = openContainerBlockMap.getContainerOpenKeyMap(). + get(testContainerID).get(blockID.getLocalID()); + // the key should exist in the map + Assert.assertTrue( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID) + .containsKey(blockID.getLocalID())); + Assert.assertTrue( + keyData.getChunks().size() == chunkList.size()); + ContainerProtos.CloseContainerRequestProto.Builder closeContainerProto = + ContainerProtos.CloseContainerRequestProto.newBuilder(); + closeContainerProto.setContainerID(blockID.getContainerID()); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setCloseContainer(closeContainerProto); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); + dispatcher.dispatch(request.build()); + Assert.assertNull( + openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)); + // Make sure the key got committed + Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID)); + } +} \ No newline at end of file