HDDS-288. Fix bugs in OpenContainerBlockMap. Contributed by Tsz Wo Nicholas Sze.
This commit is contained in:
parent
3d3158cea4
commit
3c4fbc635e
@ -21,22 +21,52 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Map: containerId -> (localId -> KeyData).
|
||||
* The outer container map does not entail locking for a better performance.
|
||||
* The inner {@link KeyDataMap} is synchronized.
|
||||
*
|
||||
* This class will maintain list of open keys per container when closeContainer
|
||||
* command comes, it should autocommit all open keys of a open container before
|
||||
* marking the container as closed.
|
||||
*/
|
||||
public class OpenContainerBlockMap {
|
||||
/**
|
||||
* Map: localId -> KeyData.
|
||||
*
|
||||
* In order to support {@link #getAll()}, the update operations are synchronized.
|
||||
*/
|
||||
static class KeyDataMap {
|
||||
private final ConcurrentMap<Long, KeyData> blocks = new ConcurrentHashMap<>();
|
||||
|
||||
KeyData get(long localId) {
|
||||
return blocks.get(localId);
|
||||
}
|
||||
|
||||
synchronized int removeAndGetSize(long localId) {
|
||||
blocks.remove(localId);
|
||||
return blocks.size();
|
||||
}
|
||||
|
||||
synchronized KeyData computeIfAbsent(long localId, Function<Long, KeyData> f) {
|
||||
return blocks.computeIfAbsent(localId, f);
|
||||
}
|
||||
|
||||
synchronized List<KeyData> getAll() {
|
||||
return new ArrayList<>(blocks.values());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO : We may construct the openBlockMap by reading the Block Layout
|
||||
@ -46,15 +76,8 @@ public class OpenContainerBlockMap {
|
||||
*
|
||||
* For now, we will track all open blocks of a container in the blockMap.
|
||||
*/
|
||||
private final ConcurrentHashMap<Long, HashMap<Long, KeyData>>
|
||||
openContainerBlockMap;
|
||||
private final ConcurrentMap<Long, KeyDataMap> containers = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Constructs OpenContainerBlockMap.
|
||||
*/
|
||||
public OpenContainerBlockMap() {
|
||||
openContainerBlockMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
/**
|
||||
* Removes the Container matching with specified containerId.
|
||||
* @param containerId containerId
|
||||
@ -62,73 +85,27 @@ public OpenContainerBlockMap() {
|
||||
public void removeContainer(long containerId) {
|
||||
Preconditions
|
||||
.checkState(containerId >= 0, "Container Id cannot be negative.");
|
||||
openContainerBlockMap.computeIfPresent(containerId, (k, v) -> null);
|
||||
containers.remove(containerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* updates the chunkInfoList in case chunk is added or deleted
|
||||
* @param blockID id of the block.
|
||||
* @param info - Chunk Info
|
||||
* @param remove if true, deletes the chunkInfo list otherwise appends to the
|
||||
* chunkInfo List
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void updateOpenKeyMap(BlockID blockID,
|
||||
ContainerProtos.ChunkInfo info, boolean remove) throws IOException {
|
||||
if (remove) {
|
||||
deleteChunkFromMap(blockID, info);
|
||||
} else {
|
||||
addChunkToMap(blockID, info);
|
||||
}
|
||||
}
|
||||
|
||||
private KeyData getKeyData(ContainerProtos.ChunkInfo info, BlockID blockID)
|
||||
throws IOException {
|
||||
KeyData keyData = new KeyData(blockID);
|
||||
keyData.addMetadata("TYPE", "KEY");
|
||||
keyData.addChunk(info);
|
||||
return keyData;
|
||||
}
|
||||
|
||||
private void addChunkToMap(BlockID blockID, ContainerProtos.ChunkInfo info)
|
||||
throws IOException {
|
||||
public void addChunk(BlockID blockID, ChunkInfo info) {
|
||||
Preconditions.checkNotNull(info);
|
||||
long containerId = blockID.getContainerID();
|
||||
long localID = blockID.getLocalID();
|
||||
|
||||
KeyData keyData = openContainerBlockMap.computeIfAbsent(containerId,
|
||||
emptyMap -> new LinkedHashMap<Long, KeyData>())
|
||||
.putIfAbsent(localID, getKeyData(info, blockID));
|
||||
// KeyData != null means the block already exist
|
||||
if (keyData != null) {
|
||||
HashMap<Long, KeyData> keyDataSet =
|
||||
openContainerBlockMap.get(containerId);
|
||||
keyDataSet.putIfAbsent(blockID.getLocalID(), getKeyData(info, blockID));
|
||||
keyDataSet.computeIfPresent(blockID.getLocalID(), (key, value) -> {
|
||||
value.addChunk(info);
|
||||
return value;
|
||||
});
|
||||
}
|
||||
containers.computeIfAbsent(blockID.getContainerID(), id -> new KeyDataMap())
|
||||
.computeIfAbsent(blockID.getLocalID(), id -> new KeyData(blockID))
|
||||
.addChunk(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* removes the chunks from the chunkInfo list for the given block.
|
||||
* Removes the chunk from the chunkInfo list for the given block.
|
||||
* @param blockID id of the block
|
||||
* @param chunkInfo chunk info.
|
||||
*/
|
||||
private synchronized void deleteChunkFromMap(BlockID blockID,
|
||||
ContainerProtos.ChunkInfo chunkInfo) {
|
||||
public void removeChunk(BlockID blockID, ChunkInfo chunkInfo) {
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
Preconditions.checkNotNull(blockID);
|
||||
HashMap<Long, KeyData> keyDataMap =
|
||||
openContainerBlockMap.get(blockID.getContainerID());
|
||||
if (keyDataMap != null) {
|
||||
long localId = blockID.getLocalID();
|
||||
KeyData keyData = keyDataMap.get(localId);
|
||||
if (keyData != null) {
|
||||
keyData.removeChunk(chunkInfo);
|
||||
}
|
||||
}
|
||||
Optional.ofNullable(containers.get(blockID.getContainerID()))
|
||||
.map(blocks -> blocks.get(blockID.getLocalID()))
|
||||
.ifPresent(keyData -> keyData.removeChunk(chunkInfo));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -137,31 +114,23 @@ private synchronized void deleteChunkFromMap(BlockID blockID,
|
||||
* @return List of open Keys(blocks)
|
||||
*/
|
||||
public List<KeyData> getOpenKeys(long containerId) {
|
||||
HashMap<Long, KeyData> keyDataHashMap =
|
||||
openContainerBlockMap.get(containerId);
|
||||
return keyDataHashMap == null ? null :
|
||||
keyDataHashMap.values().stream().collect(Collectors.toList());
|
||||
return Optional.ofNullable(containers.get(containerId))
|
||||
.map(KeyDataMap::getAll)
|
||||
.orElseGet(Collections::emptyList);
|
||||
}
|
||||
|
||||
/**
|
||||
* removes the block from the block map.
|
||||
* @param blockID
|
||||
*/
|
||||
public synchronized void removeFromKeyMap(BlockID blockID) {
|
||||
public void removeFromKeyMap(BlockID blockID) {
|
||||
Preconditions.checkNotNull(blockID);
|
||||
HashMap<Long, KeyData> keyDataMap =
|
||||
openContainerBlockMap.get(blockID.getContainerID());
|
||||
if (keyDataMap != null) {
|
||||
keyDataMap.remove(blockID.getLocalID());
|
||||
if (keyDataMap.size() == 0) {
|
||||
removeContainer(blockID.getContainerID());
|
||||
}
|
||||
}
|
||||
containers.computeIfPresent(blockID.getContainerID(), (containerId, blocks)
|
||||
-> blocks.removeAndGetSize(blockID.getLocalID()) == 0? null: blocks);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ConcurrentHashMap<Long,
|
||||
HashMap<Long, KeyData>> getContainerOpenKeyMap() {
|
||||
return openContainerBlockMap;
|
||||
KeyDataMap getKeyDataMap(long containerId) {
|
||||
return containers.get(containerId);
|
||||
}
|
||||
}
|
||||
|
@ -435,12 +435,10 @@ private void commitPendingKeys(KeyValueContainer kvContainer)
|
||||
long containerId = kvContainer.getContainerData().getContainerID();
|
||||
List<KeyData> pendingKeys =
|
||||
this.openContainerBlockMap.getOpenKeys(containerId);
|
||||
if (pendingKeys != null) {
|
||||
for(KeyData keyData : pendingKeys) {
|
||||
commitKey(keyData, kvContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
||||
throws IOException {
|
||||
@ -598,7 +596,7 @@ ContainerCommandResponseProto handleDeleteChunk(
|
||||
Preconditions.checkNotNull(chunkInfo);
|
||||
|
||||
chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
|
||||
openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, true);
|
||||
openContainerBlockMap.removeChunk(blockID, chunkInfoProto);
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
} catch (IOException ex) {
|
||||
@ -648,7 +646,7 @@ ContainerCommandResponseProto handleWriteChunk(
|
||||
.getChunkData().getLen());
|
||||
// the openContainerBlockMap should be updated only while writing data
|
||||
// not during COMMIT_STAGE of handling write chunk request.
|
||||
openContainerBlockMap.updateOpenKeyMap(blockID, chunkInfoProto, false);
|
||||
openContainerBlockMap.addChunk(blockID, chunkInfoProto);
|
||||
}
|
||||
} catch (StorageContainerException ex) {
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||
|
@ -162,9 +162,9 @@ public void testPutKeyWithMultipleChunks()
|
||||
Pipeline pipeline = createSingleNodePipeline();
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
// the key should exist in the map
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
Assert.assertNotNull(
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID)
|
||||
.get(blockID.getLocalID()));
|
||||
KeyData keyData = new KeyData(blockID);
|
||||
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
|
||||
for (ChunkInfo i : chunkList) {
|
||||
@ -184,7 +184,7 @@ public void testPutKeyWithMultipleChunks()
|
||||
|
||||
//the open key should be removed from Map
|
||||
Assert.assertNull(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -196,11 +196,11 @@ public void testDeleteChunk() throws Exception {
|
||||
Pipeline pipeline = createSingleNodePipeline();
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
// the key should exist in the map
|
||||
Assert.assertNotNull(
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID)
|
||||
.get(blockID.getLocalID()));
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID)
|
||||
.get(blockID.getLocalID()).getChunks().size() == 3);
|
||||
ContainerProtos.DeleteChunkRequestProto.Builder deleteChunkProto =
|
||||
ContainerProtos.DeleteChunkRequestProto.newBuilder();
|
||||
@ -219,7 +219,7 @@ public void testDeleteChunk() throws Exception {
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID)
|
||||
.get(blockID.getLocalID()).getChunks().size() == 2);
|
||||
|
||||
}
|
||||
@ -234,12 +234,12 @@ public void testCloseContainer() throws Exception {
|
||||
List<ChunkInfo> chunkList = writeChunkBuilder(blockID, pipeline, 3);
|
||||
|
||||
Container container = containerSet.getContainer(testContainerID);
|
||||
KeyData keyData = openContainerBlockMap.getContainerOpenKeyMap().
|
||||
get(testContainerID).get(blockID.getLocalID());
|
||||
KeyData keyData = openContainerBlockMap.
|
||||
getKeyDataMap(testContainerID).get(blockID.getLocalID());
|
||||
// the key should exist in the map
|
||||
Assert.assertTrue(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID)
|
||||
.containsKey(blockID.getLocalID()));
|
||||
Assert.assertNotNull(
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID)
|
||||
.get(blockID.getLocalID()));
|
||||
Assert.assertTrue(
|
||||
keyData.getChunks().size() == chunkList.size());
|
||||
ContainerProtos.CloseContainerRequestProto.Builder closeContainerProto =
|
||||
@ -253,7 +253,7 @@ public void testCloseContainer() throws Exception {
|
||||
request.setDatanodeUuid(pipeline.getLeader().getUuidString());
|
||||
dispatcher.dispatch(request.build());
|
||||
Assert.assertNull(
|
||||
openContainerBlockMap.getContainerOpenKeyMap().get(testContainerID));
|
||||
openContainerBlockMap.getKeyDataMap(testContainerID));
|
||||
// Make sure the key got committed
|
||||
Assert.assertNotNull(handler.getKeyManager().getKey(container, blockID));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user