HDDS-181. CloseContainer should commit all pending open Keys on a datanode. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
9fa9e301b0
commit
bbe2f6225e
@ -25,6 +25,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
* Helper class to convert Protobuf to Java classes.
|
||||
@ -130,8 +131,26 @@ public List<ContainerProtos.ChunkInfo> getChunks() {
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds chinkInfo to the list
|
||||
*/
|
||||
public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
||||
if (chunks == null) {
|
||||
chunks = new ArrayList<>();
|
||||
}
|
||||
chunks.add(chunkInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* removes the chunk.
|
||||
*/
|
||||
public void removeChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
||||
chunks.remove(chunkInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container ID.
|
||||
*
|
||||
* @return long.
|
||||
*/
|
||||
public long getContainerID() {
|
||||
@ -170,5 +189,4 @@ public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
||||
public long getSize() {
|
||||
return chunks.parallelStream().mapToLong(e->e.getLen()).sum();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,167 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class will maintain list of open keys per container when closeContainer
|
||||
* command comes, it should autocommit all open keys of a open container before
|
||||
* marking the container as closed.
|
||||
*/
|
||||
public class OpenContainerBlockMap {
|
||||
|
||||
/**
|
||||
* TODO : We may construct the openBlockMap by reading the Block Layout
|
||||
* for each block inside a container listing all chunk files and reading the
|
||||
* sizes. This will help to recreate the openKeys Map once the DataNode
|
||||
* restarts.
|
||||
*
|
||||
* For now, we will track all open blocks of a container in the blockMap.
|
||||
*/
|
||||
private final ConcurrentHashMap<Long, HashMap<Long, KeyData>>
|
||||
openContainerBlockMap;
|
||||
|
||||
/**
|
||||
* Constructs OpenContainerBlockMap.
|
||||
*/
|
||||
public OpenContainerBlockMap() {
|
||||
openContainerBlockMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
/**
|
||||
* Removes the Container matching with specified containerId.
|
||||
* @param containerId containerId
|
||||
*/
|
||||
public void removeContainer(long containerId) {
|
||||
Preconditions
|
||||
.checkState(containerId >= 0, "Container Id cannot be negative.");
|
||||
openContainerBlockMap.computeIfPresent(containerId, (k, v) -> null);
|
||||
}
|
||||
|
||||
/**
|
||||
* updates the chunkInfoList in case chunk is added or deleted
|
||||
* @param blockID id of the block.
|
||||
* @param info - Chunk Info
|
||||
* @param remove if true, deletes the chunkInfo list otherwise appends to the
|
||||
* chunkInfo List
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void updateOpenKeyMap(BlockID blockID,
|
||||
ContainerProtos.ChunkInfo info, boolean remove) throws IOException {
|
||||
if (remove) {
|
||||
deleteChunkFromMap(blockID, info);
|
||||
} else {
|
||||
addChunkToMap(blockID, info);
|
||||
}
|
||||
}
|
||||
|
||||
private KeyData getKeyData(ContainerProtos.ChunkInfo info, BlockID blockID)
|
||||
throws IOException {
|
||||
KeyData keyData = new KeyData(blockID);
|
||||
keyData.addMetadata("TYPE", "KEY");
|
||||
keyData.addChunk(info);
|
||||
return keyData;
|
||||
}
|
||||
|
||||
private void addChunkToMap(BlockID blockID, ContainerProtos.ChunkInfo info)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(info);
|
||||
long containerId = blockID.getContainerID();
|
||||
long localID = blockID.getLocalID();
|
||||
|
||||
KeyData keyData = openContainerBlockMap.computeIfAbsent(containerId,
|
||||
emptyMap -> new LinkedHashMap<Long, KeyData>())
|
||||
.putIfAbsent(localID, getKeyData(info, blockID));
|
||||
// KeyData != null means the block already exist
|
||||
if (keyData != null) {
|
||||
HashMap<Long, KeyData> keyDataSet =
|
||||
openContainerBlockMap.get(containerId);
|
||||
keyDataSet.putIfAbsent(blockID.getLocalID(), getKeyData(info, blockID));
|
||||
keyDataSet.computeIfPresent(blockID.getLocalID(), (key, value) -> {
|
||||
value.addChunk(info);
|
||||
return value;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* removes the chunks from the chunkInfo list for the given block.
|
||||
* @param blockID id of the block
|
||||
* @param chunkInfo chunk info.
|
||||
*/
|
||||
private synchronized void deleteChunkFromMap(BlockID blockID,
|
||||
ContainerProtos.ChunkInfo chunkInfo) {
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
Preconditions.checkNotNull(blockID);
|
||||
HashMap<Long, KeyData> keyDataMap =
|
||||
openContainerBlockMap.get(blockID.getContainerID());
|
||||
if (keyDataMap != null) {
|
||||
long localId = blockID.getLocalID();
|
||||
KeyData keyData = keyDataMap.get(localId);
|
||||
if (keyData != null) {
|
||||
keyData.removeChunk(chunkInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* returns the list of open to the openContainerBlockMap
|
||||
* @param containerId container id
|
||||
* @return List of open Keys(blocks)
|
||||
*/
|
||||
public List<KeyData> getOpenKeys(long containerId) {
|
||||
HashMap<Long, KeyData> keyDataHashMap =
|
||||
openContainerBlockMap.get(containerId);
|
||||
return keyDataHashMap == null ? null :
|
||||
keyDataHashMap.values().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* removes the block from the block map.
|
||||
* @param blockID
|
||||
*/
|
||||
public synchronized void removeFromKeyMap(BlockID blockID) {
|
||||
Preconditions.checkNotNull(blockID);
|
||||
HashMap<Long, KeyData> keyDataMap =
|
||||
openContainerBlockMap.get(blockID.getContainerID());
|
||||
if (keyDataMap != null) {
|
||||
keyDataMap.remove(blockID.getLocalID());
|
||||
if (keyDataMap.size() == 0) {
|
||||
removeContainer(blockID.getContainerID());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ConcurrentHashMap<Long,
|
||||
HashMap<Long, KeyData>> getContainerOpenKeyMap() {
|
||||
return openContainerBlockMap;
|
||||
}
|
||||
}
|
@ -45,6 +45,7 @@
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
||||
import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
|
||||
@ -117,7 +118,7 @@ public class KeyValueHandler extends Handler {
|
||||
private VolumeChoosingPolicy volumeChoosingPolicy;
|
||||
private final int maxContainerSizeGB;
|
||||
private final AutoCloseableLock handlerLock;
|
||||
|
||||
private final OpenContainerBlockMap openContainerBlockMap;
|
||||
|
||||
public KeyValueHandler(Configuration config, ContainerSet contSet,
|
||||
VolumeSet volSet, ContainerMetrics metrics) {
|
||||
@ -145,6 +146,15 @@ public KeyValueHandler(Configuration config, ContainerSet contSet,
|
||||
// this handler lock is used for synchronizing createContainer Requests,
|
||||
// so using a fair lock here.
|
||||
handlerLock = new AutoCloseableLock(new ReentrantLock(true));
|
||||
openContainerBlockMap = new OpenContainerBlockMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns OpenContainerBlockMap instance
|
||||
* @return OpenContainerBlockMap
|
||||
*/
|
||||
public OpenContainerBlockMap getOpenContainerBlockMap() {
|
||||
return openContainerBlockMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -333,8 +343,9 @@ ContainerCommandResponseProto handleDeleteContainer(
|
||||
"Container cannot be deleted because it is not empty.",
|
||||
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
|
||||
} else {
|
||||
containerSet.removeContainer(
|
||||
kvContainer.getContainerData().getContainerID());
|
||||
long containerId = kvContainer.getContainerData().getContainerID();
|
||||
containerSet.removeContainer(containerId);
|
||||
openContainerBlockMap.removeContainer(containerId);
|
||||
// Release the lock first.
|
||||
// Avoid holding write locks for disk operations
|
||||
kvContainer.writeUnlock();
|
||||
@ -366,9 +377,21 @@ ContainerCommandResponseProto handleCloseContainer(
|
||||
try {
|
||||
checkContainerOpen(kvContainer);
|
||||
|
||||
// remove the container from open block map once, all the blocks
|
||||
// have been committed and the container is closed
|
||||
kvContainer.getContainerData()
|
||||
.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
|
||||
commitPendingKeys(kvContainer);
|
||||
kvContainer.close();
|
||||
// make sure the the container open keys from BlockMap gets removed
|
||||
openContainerBlockMap.removeContainer(
|
||||
request.getCloseContainer().getContainerID());
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
} catch (IOException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG,
|
||||
new StorageContainerException("Close Container failed", ex,
|
||||
IO_EXCEPTION), request);
|
||||
}
|
||||
|
||||
return ContainerUtils.getSuccessResponse(request);
|
||||
@ -391,10 +414,8 @@ ContainerCommandResponseProto handlePutKey(
|
||||
|
||||
KeyData keyData = KeyData.getFromProtoBuf(
|
||||
request.getPutKey().getKeyData());
|
||||
Preconditions.checkNotNull(keyData);
|
||||
|
||||
keyManager.putKey(kvContainer, keyData);
|
||||
long numBytes = keyData.getProtoBufMessage().toByteArray().length;
|
||||
commitKey(keyData, kvContainer);
|
||||
metrics.incContainerBytesStats(Type.PutKey, numBytes);
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
@ -407,6 +428,25 @@ ContainerCommandResponseProto handlePutKey(
|
||||
return KeyUtils.getKeyResponseSuccess(request);
|
||||
}
|
||||
|
||||
private void commitPendingKeys(KeyValueContainer kvContainer)
|
||||
throws IOException {
|
||||
long containerId = kvContainer.getContainerData().getContainerID();
|
||||
List<KeyData> pendingKeys =
|
||||
this.openContainerBlockMap.getOpenKeys(containerId);
|
||||
if (pendingKeys != null) {
|
||||
for (KeyData keyData : pendingKeys) {
|
||||
commitKey(keyData, kvContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(keyData);
|
||||
keyManager.putKey(kvContainer, keyData);
|
||||
//update the open key Map in containerManager
|
||||
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
|
||||
}
|
||||
/**
|
||||
* Handle Get Key operation. Calls KeyManager to process the request.
|
||||
*/
|
||||
@ -519,11 +559,13 @@ ContainerCommandResponseProto handleDeleteChunk(
|
||||
|
||||
BlockID blockID = BlockID.getFromProtobuf(
|
||||
request.getDeleteChunk().getBlockID());
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
|
||||
.getChunkData());
|
||||
ContainerProtos.ChunkInfo chunkInfoProto = request.getDeleteChunk()
|
||||
.getChunkData();
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
|
||||
chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
|
||||
openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, true);
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
} catch (IOException ex) {
|
||||
@ -552,8 +594,9 @@ ContainerCommandResponseProto handleWriteChunk(
|
||||
|
||||
BlockID blockID = BlockID.getFromProtobuf(
|
||||
request.getWriteChunk().getBlockID());
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
|
||||
.getChunkData());
|
||||
ContainerProtos.ChunkInfo chunkInfoProto =
|
||||
request.getWriteChunk().getChunkData();
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
|
||||
byte[] data = null;
|
||||
@ -570,6 +613,9 @@ ContainerCommandResponseProto handleWriteChunk(
|
||||
request.getWriteChunk().getStage() == Stage.COMBINED) {
|
||||
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
|
||||
.getChunkData().getLen());
|
||||
// the openContainerBlockMap should be updated only while writing data
|
||||
// not during COMMIT_STAGE of handling write chunk request.
|
||||
openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, false);
|
||||
}
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
@ -610,8 +656,9 @@ ContainerCommandResponseProto handlePutSmallFile(
|
||||
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
|
||||
putSmallFileReq.getChunkInfo());
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
|
||||
byte[] data = putSmallFileReq.getData().toByteArray();
|
||||
// chunks will be committed as a part of handling putSmallFile
|
||||
// here. There is no need to maintain this info in openContainerBlockMap.
|
||||
chunkManager.writeChunk(
|
||||
kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
|
||||
|
||||
|
@ -0,0 +1,260 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.LinkedList;
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||
.createSingleNodePipeline;
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||
.setDataChecksum;
|
||||
|
||||
/**
|
||||
* Simple tests to verify that closeContainer handler on Datanode.
|
||||
*/
|
||||
public class TestCloseContainerHandler {
|
||||
|
||||
@Rule
|
||||
public TestRule timeout = new Timeout(300000);
|
||||
|
||||
private static Configuration conf;
|
||||
private static HddsDispatcher dispatcher;
|
||||
private static ContainerSet containerSet;
|
||||
private static VolumeSet volumeSet;
|
||||
private static KeyValueHandler handler;
|
||||
private static OpenContainerBlockMap openContainerBlockMap;
|
||||
|
||||
private final static String DATANODE_UUID = UUID.randomUUID().toString();
|
||||
|
||||
private static final String baseDir = MiniDFSCluster.getBaseDirectory();
|
||||
private static final String volume1 = baseDir + "disk1";
|
||||
private static final String volume2 = baseDir + "disk2";
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf = new Configuration();
|
||||
String dataDirKey = volume1 + "," + volume2;
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
|
||||
containerSet = new ContainerSet();
|
||||
DatanodeDetails datanodeDetails =
|
||||
DatanodeDetails.newBuilder().setUuid(DATANODE_UUID)
|
||||
.setHostName("localhost").setIpAddress("127.0.0.1").build();
|
||||
volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
|
||||
|
||||
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
|
||||
handler = (KeyValueHandler) dispatcher
|
||||
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
|
||||
openContainerBlockMap = handler.getOpenContainerBlockMap();
|
||||
dispatcher.setScmId(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
// Delete the hdds volume root dir
|
||||
List<HddsVolume> volumes = new ArrayList<>();
|
||||
volumes.addAll(volumeSet.getVolumesList());
|
||||
volumes.addAll(volumeSet.getFailedVolumesList());
|
||||
|
||||
for (HddsVolume volume : volumes) {
|
||||
FileUtils.deleteDirectory(volume.getHddsRootDir());
|
||||
}
|
||||
volumeSet.shutdown();
|
||||
}
|
||||
|
||||
private long createContainer() {
|
||||
long testContainerId = ContainerTestHelper.getTestContainerID();
|
||||
ContainerProtos.CreateContainerRequestProto createReq =
|
||||
ContainerProtos.CreateContainerRequestProto.newBuilder()
|
||||
.setContainerID(testContainerId)
|
||||
.build();
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder()
|
||||
.setCmdType(ContainerProtos.Type.CreateContainer)
|
||||
.setDatanodeUuid(DATANODE_UUID)
|
||||
.setCreateContainer(createReq)
|
||||
.build();
|
||||
|
||||
dispatcher.dispatch(request);
|
||||
return testContainerId;
|
||||
}
|
||||
|
||||
private List<ChunkInfo> writeChunkBuilder(BlockID blockID, Pipeline pipeline,
|
||||
int chunkCount)
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
final int datalen = 1024;
|
||||
long testContainerID = blockID.getContainerID();
|
||||
List<ChunkInfo> chunkList = new LinkedList<>();
|
||||
for (int x = 0; x < chunkCount; x++) {
|
||||
ChunkInfo info = getChunk(blockID.getLocalID(), x, datalen * x, datalen);
|
||||
byte[] data = getData(datalen);
|
||||
setDataChecksum(info, data);
|
||||
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
|
||||
ContainerProtos.WriteChunkRequestProto.newBuilder();
|
||||
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||
writeRequest.setChunkData(info.getProtoBufMessage());
|
||||
writeRequest.setData(ByteString.copyFrom(data));
|
||||
writeRequest.setStage(ContainerProtos.Stage.COMBINED);
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.WriteChunk);
|
||||
request.setWriteChunk(writeRequest);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
chunkList.add(info);
|
||||
}
|
||||
return chunkList;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutKeyWithMultipleChunks()
|
||||
throws IOException, NoSuchAlgorithmException {
|
||||
long testContainerID = createContainer();
|
||||
Assert.assertNotNull(containerSet.getContainer(testContainerID));
|
||||
BlockID blockID = ContainerTestHelper.
|
||||
getTestBlockID(testContainerID);
|
||||
Pipeline pipeline = createSingleNodePipeline();
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
// the key should exist in the map
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
KeyData keyData = new KeyData(blockID);
|
||||
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
|
||||
for (ChunkInfo i : chunkList) {
|
||||
chunkProtoList.add(i.getProtoBufMessage());
|
||||
}
|
||||
keyData.setChunks(chunkProtoList);
|
||||
ContainerProtos.PutKeyRequestProto.Builder putKeyRequestProto =
|
||||
ContainerProtos.PutKeyRequestProto.newBuilder();
|
||||
putKeyRequestProto.setKeyData(keyData.getProtoBufMessage());
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.PutKey);
|
||||
request.setPutKey(putKeyRequestProto);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
|
||||
//the open key should be removed from Map
|
||||
Assert.assertNull(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteChunk() throws Exception {
|
||||
long testContainerID = createContainer();
|
||||
Assert.assertNotNull(containerSet.getContainer(testContainerID));
|
||||
BlockID blockID = ContainerTestHelper.
|
||||
getTestBlockID(testContainerID);
|
||||
Pipeline pipeline = createSingleNodePipeline();
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
// the key should exist in the map
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.get(blockID.getLocalID()).getChunks().size() == 3);
|
||||
ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
|
||||
ContainerProtos.DeleteChunkRequestProto.newBuilder();
|
||||
deleteChunkProto.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||
deleteChunkProto.setChunkData(chunkList.get(0).getProtoBufMessage());
|
||||
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
|
||||
ContainerProtos.WriteChunkRequestProto.newBuilder();
|
||||
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||
writeRequest.setChunkData(chunkList.get(0).getProtoBufMessage());
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.DeleteChunk);
|
||||
request.setDeleteChunk(deleteChunkProto);
|
||||
request.setWriteChunk(writeRequest);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.get(blockID.getLocalID()).getChunks().size() == 2);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseContainer() throws Exception {
|
||||
long testContainerID = createContainer();
|
||||
Assert.assertNotNull(containerSet.getContainer(testContainerID));
|
||||
BlockID blockID = ContainerTestHelper.
|
||||
getTestBlockID(testContainerID);
|
||||
Pipeline pipeline = createSingleNodePipeline();
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
|
||||
Container container = containerSet.getContainer(testContainerID);
|
||||
KeyData keyData = openContainerBlockMap.getContainerOpenKeyMap().
|
||||
get(testContainerID).get(blockID.getLocalID());
|
||||
// the key should exist in the map
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
Assert.assertTrue(
|
||||
keyData.getChunks().size() == chunkList.size());
|
||||
ContainerProtos.CloseContainerRequestProto.Builder closeContainerProto =
|
||||
ContainerProtos.CloseContainerRequestProto.newBuilder();
|
||||
closeContainerProto.setContainerID(blockID.getContainerID());
|
||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.CloseContainer);
|
||||
request.setCloseContainer(closeContainerProto);
|
||||
request.setTraceID(UUID.randomUUID().toString());
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
Assert.assertNull(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
|
||||
// Make sure the key got committed
|
||||
Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user