HDDS-293. Reduce memory usage and object creation in KeyData.
This commit is contained in:
parent
2b39ad2698
commit
ee53602a81
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.client.BlockID;
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -35,11 +36,17 @@ public class KeyData {
|
|||||||
private final Map<String, String> metadata;
|
private final Map<String, String> metadata;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Represent a list of chunks.
|
||||||
|
* In order to reduce memory usage, chunkList is declared as an {@link Object}.
|
||||||
|
* When #elements == 0, chunkList is null.
|
||||||
|
* When #elements == 1, chunkList refers to the only element.
|
||||||
|
* When #elements > 1, chunkList refers to the list.
|
||||||
|
*
|
||||||
* Please note : when we are working with keys, we don't care what they point
|
* Please note : when we are working with keys, we don't care what they point
|
||||||
* to. So we We don't read chunkinfo nor validate them. It is responsibility
|
* to. So we We don't read chunkinfo nor validate them. It is responsibility
|
||||||
* of higher layer like ozone. We just read and write data from network.
|
* of higher layer like ozone. We just read and write data from network.
|
||||||
*/
|
*/
|
||||||
private List<ContainerProtos.ChunkInfo> chunks;
|
private Object chunkList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* total size of the key.
|
* total size of the key.
|
||||||
@ -73,7 +80,7 @@ public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
|
|||||||
}
|
}
|
||||||
keyData.setChunks(data.getChunksList());
|
keyData.setChunks(data.getChunksList());
|
||||||
if (data.hasSize()) {
|
if (data.hasSize()) {
|
||||||
keyData.setSize(data.getSize());
|
Preconditions.checkArgument(data.getSize() == keyData.getSize());
|
||||||
}
|
}
|
||||||
return keyData;
|
return keyData;
|
||||||
}
|
}
|
||||||
@ -86,13 +93,13 @@ public ContainerProtos.KeyData getProtoBufMessage() {
|
|||||||
ContainerProtos.KeyData.Builder builder =
|
ContainerProtos.KeyData.Builder builder =
|
||||||
ContainerProtos.KeyData.newBuilder();
|
ContainerProtos.KeyData.newBuilder();
|
||||||
builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf());
|
builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf());
|
||||||
builder.addAllChunks(this.chunks);
|
|
||||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||||
ContainerProtos.KeyValue.Builder keyValBuilder =
|
ContainerProtos.KeyValue.Builder keyValBuilder =
|
||||||
ContainerProtos.KeyValue.newBuilder();
|
ContainerProtos.KeyValue.newBuilder();
|
||||||
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
||||||
.setValue(entry.getValue()).build());
|
.setValue(entry.getValue()).build());
|
||||||
}
|
}
|
||||||
|
builder.addAllChunks(getChunks());
|
||||||
builder.setSize(size);
|
builder.setSize(size);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
@ -132,30 +139,65 @@ public synchronized void deleteKey(String key) {
|
|||||||
metadata.remove(key);
|
metadata.remove(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private List<ContainerProtos.ChunkInfo> castChunkList() {
|
||||||
|
return (List<ContainerProtos.ChunkInfo>)chunkList;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns chunks list.
|
* Returns chunks list.
|
||||||
*
|
*
|
||||||
* @return list of chunkinfo.
|
* @return list of chunkinfo.
|
||||||
*/
|
*/
|
||||||
public List<ContainerProtos.ChunkInfo> getChunks() {
|
public List<ContainerProtos.ChunkInfo> getChunks() {
|
||||||
return chunks;
|
return chunkList == null? Collections.emptyList()
|
||||||
|
: chunkList instanceof ContainerProtos.ChunkInfo?
|
||||||
|
Collections.singletonList((ContainerProtos.ChunkInfo)chunkList)
|
||||||
|
: Collections.unmodifiableList(castChunkList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds chinkInfo to the list
|
* Adds chinkInfo to the list
|
||||||
*/
|
*/
|
||||||
public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
||||||
if (chunks == null) {
|
if (chunkList == null) {
|
||||||
chunks = new ArrayList<>();
|
chunkList = chunkInfo;
|
||||||
|
} else {
|
||||||
|
final List<ContainerProtos.ChunkInfo> list;
|
||||||
|
if (chunkList instanceof ContainerProtos.ChunkInfo) {
|
||||||
|
list = new ArrayList<>(2);
|
||||||
|
list.add((ContainerProtos.ChunkInfo)chunkList);
|
||||||
|
chunkList = list;
|
||||||
|
} else {
|
||||||
|
list = castChunkList();
|
||||||
}
|
}
|
||||||
chunks.add(chunkInfo);
|
list.add(chunkInfo);
|
||||||
|
}
|
||||||
|
size += chunkInfo.getLen();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* removes the chunk.
|
* removes the chunk.
|
||||||
*/
|
*/
|
||||||
public void removeChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
public boolean removeChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
||||||
chunks.remove(chunkInfo);
|
final boolean removed;
|
||||||
|
if (chunkList instanceof List) {
|
||||||
|
final List<ContainerProtos.ChunkInfo> list = castChunkList();
|
||||||
|
removed = list.remove(chunkInfo);
|
||||||
|
if (list.size() == 1) {
|
||||||
|
chunkList = list.get(0);
|
||||||
|
}
|
||||||
|
} else if (chunkInfo.equals(chunkList)) {
|
||||||
|
chunkList = null;
|
||||||
|
removed = true;
|
||||||
|
} else {
|
||||||
|
removed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (removed) {
|
||||||
|
size -= chunkInfo.getLen();
|
||||||
|
}
|
||||||
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -189,15 +231,14 @@ public BlockID getBlockID() {
|
|||||||
* @param chunks - List of chunks.
|
* @param chunks - List of chunks.
|
||||||
*/
|
*/
|
||||||
public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
||||||
this.chunks = chunks;
|
if (chunks == null) {
|
||||||
|
chunkList = null;
|
||||||
|
size = 0L;
|
||||||
|
} else {
|
||||||
|
final int n = chunks.size();
|
||||||
|
chunkList = n == 0? null: n == 1? chunks.get(0): chunks;
|
||||||
|
size = chunks.parallelStream().mapToLong(ContainerProtos.ChunkInfo::getLen).sum();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* sets the total size of the block
|
|
||||||
* @param size size of the block
|
|
||||||
*/
|
|
||||||
public void setSize(long size) {
|
|
||||||
this.size = size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -207,11 +248,4 @@ public void setSize(long size) {
|
|||||||
public long getSize() {
|
public long getSize() {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* computes the total size of chunks allocated for the key.
|
|
||||||
*/
|
|
||||||
public void computeSize() {
|
|
||||||
setSize(chunks.parallelStream().mapToLong(e -> e.getLen()).sum());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map: containerId -> (localId -> KeyData).
|
* Map: containerId -> (localId -> {@link KeyData}).
|
||||||
* The outer container map does not entail locking for a better performance.
|
* The outer container map does not entail locking for a better performance.
|
||||||
* The inner {@link KeyDataMap} is synchronized.
|
* The inner {@link KeyDataMap} is synchronized.
|
||||||
*
|
*
|
||||||
|
@ -439,8 +439,6 @@ private void commitPendingKeys(KeyValueContainer kvContainer)
|
|||||||
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
private void commitKey(KeyData keyData, KeyValueContainer kvContainer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkNotNull(keyData);
|
Preconditions.checkNotNull(keyData);
|
||||||
//sets the total size of the key before committing
|
|
||||||
keyData.computeSize();
|
|
||||||
keyManager.putKey(kvContainer, keyData);
|
keyManager.putKey(kvContainer, keyData);
|
||||||
//update the open key Map in containerManager
|
//update the open key Map in containerManager
|
||||||
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
|
this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID());
|
||||||
@ -696,7 +694,6 @@ ContainerCommandResponseProto handlePutSmallFile(
|
|||||||
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
||||||
chunks.add(chunkInfo.getProtoBufMessage());
|
chunks.add(chunkInfo.getProtoBufMessage());
|
||||||
keyData.setChunks(chunks);
|
keyData.setChunks(chunks);
|
||||||
keyData.computeSize();
|
|
||||||
keyManager.putKey(kvContainer, keyData);
|
keyManager.putKey(kvContainer, keyData);
|
||||||
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
||||||
|
|
||||||
|
@ -0,0 +1,119 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests to test block deleting service.
|
||||||
|
*/
|
||||||
|
public class TestKeyData {
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(TestKeyData.class);
|
||||||
|
@Rule
|
||||||
|
public TestRule timeout = new Timeout(10000);
|
||||||
|
|
||||||
|
static ContainerProtos.ChunkInfo buildChunkInfo(String name, long offset, long len) {
|
||||||
|
return ContainerProtos.ChunkInfo.newBuilder()
|
||||||
|
.setChunkName(name).setOffset(offset).setLen(len).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddAndRemove() {
|
||||||
|
final KeyData computed = new KeyData(null);
|
||||||
|
final List<ContainerProtos.ChunkInfo> expected = new ArrayList<>();
|
||||||
|
|
||||||
|
assertChunks(expected, computed);
|
||||||
|
long offset = 0;
|
||||||
|
int n = 5;
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
offset += assertAddChunk(expected, computed, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(; !expected.isEmpty(); ) {
|
||||||
|
removeChunk(expected, computed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int chunkCount = 0;
|
||||||
|
static ContainerProtos.ChunkInfo addChunk(List<ContainerProtos.ChunkInfo> expected, long offset) {
|
||||||
|
final long length = ThreadLocalRandom.current().nextLong(1000);
|
||||||
|
final ContainerProtos.ChunkInfo info = buildChunkInfo("c" + ++chunkCount, offset, length);
|
||||||
|
expected.add(info);
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
static long assertAddChunk(List<ContainerProtos.ChunkInfo> expected, KeyData computed, long offset) {
|
||||||
|
final ContainerProtos.ChunkInfo info = addChunk(expected, offset);
|
||||||
|
LOG.info("addChunk: " + toString(info));
|
||||||
|
computed.addChunk(info);
|
||||||
|
assertChunks(expected, computed);
|
||||||
|
return info.getLen();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void removeChunk(List<ContainerProtos.ChunkInfo> expected, KeyData computed) {
|
||||||
|
final int i = ThreadLocalRandom.current().nextInt(expected.size());
|
||||||
|
final ContainerProtos.ChunkInfo info = expected.remove(i);
|
||||||
|
LOG.info("removeChunk: " + toString(info));
|
||||||
|
computed.removeChunk(info);
|
||||||
|
assertChunks(expected, computed);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void assertChunks(List<ContainerProtos.ChunkInfo> expected, KeyData computed) {
|
||||||
|
final List<ContainerProtos.ChunkInfo> computedChunks = computed.getChunks();
|
||||||
|
Assert.assertEquals("expected=" + expected + "\ncomputed=" + computedChunks, expected, computedChunks);
|
||||||
|
Assert.assertEquals(expected.stream().mapToLong(i -> i.getLen()).sum(), computed.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
static String toString(ContainerProtos.ChunkInfo info) {
|
||||||
|
return info.getChunkName() + ":" + info.getOffset() + "," + info.getLen();
|
||||||
|
}
|
||||||
|
|
||||||
|
static String toString(List<ContainerProtos.ChunkInfo> infos) {
|
||||||
|
return infos.stream().map(TestKeyData::toString)
|
||||||
|
.reduce((left, right) -> left + ", " + right)
|
||||||
|
.orElse("");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetChunks() {
|
||||||
|
final KeyData computed = new KeyData(null);
|
||||||
|
final List<ContainerProtos.ChunkInfo> expected = new ArrayList<>();
|
||||||
|
|
||||||
|
assertChunks(expected, computed);
|
||||||
|
long offset = 0;
|
||||||
|
int n = 5;
|
||||||
|
for(int i = 0; i < n; i++) {
|
||||||
|
offset += addChunk(expected, offset).getLen();
|
||||||
|
LOG.info("setChunk: " + toString(expected));
|
||||||
|
computed.setChunks(expected);
|
||||||
|
assertChunks(expected, computed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user