HDDS-265. Move numPendingDeletionBlocks and deleteTransactionId from ContainerData to KeyValueContainerData. Contributed by LiXin Ge.
This commit is contained in:
parent
81847392ba
commit
5aa15cfaff
@ -20,7 +20,6 @@
|
|||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||||
|
|
||||||
import static java.lang.Math.max;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Container Report iterates the closed containers and sends a container report
|
* Container Report iterates the closed containers and sends a container report
|
||||||
@ -37,7 +36,6 @@ public class ContainerReport {
|
|||||||
private long readBytes;
|
private long readBytes;
|
||||||
private long writeBytes;
|
private long writeBytes;
|
||||||
private long containerID;
|
private long containerID;
|
||||||
private long deleteTransactionId;
|
|
||||||
|
|
||||||
public long getContainerID() {
|
public long getContainerID() {
|
||||||
return containerID;
|
return containerID;
|
||||||
@ -47,9 +45,6 @@ public void setContainerID(long containerID) {
|
|||||||
this.containerID = containerID;
|
this.containerID = containerID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs the ContainerReport.
|
* Constructs the ContainerReport.
|
||||||
*
|
*
|
||||||
@ -66,7 +61,6 @@ public ContainerReport(long containerID, String finalhash) {
|
|||||||
this.readBytes = 0L;
|
this.readBytes = 0L;
|
||||||
this.writeCount = 0L;
|
this.writeCount = 0L;
|
||||||
this.writeBytes = 0L;
|
this.writeBytes = 0L;
|
||||||
this.deleteTransactionId = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -100,9 +94,6 @@ public static ContainerReport getFromProtoBuf(ContainerInfo info) {
|
|||||||
if (info.hasWriteBytes()) {
|
if (info.hasWriteBytes()) {
|
||||||
report.setWriteBytes(info.getWriteBytes());
|
report.setWriteBytes(info.getWriteBytes());
|
||||||
}
|
}
|
||||||
if (info.hasDeleteTransactionId()) {
|
|
||||||
report.updateDeleteTransactionId(info.getDeleteTransactionId());
|
|
||||||
}
|
|
||||||
|
|
||||||
report.setContainerID(info.getContainerID());
|
report.setContainerID(info.getContainerID());
|
||||||
return report;
|
return report;
|
||||||
@ -193,10 +184,6 @@ public void setBytesUsed(long bytesUsed) {
|
|||||||
this.bytesUsed = bytesUsed;
|
this.bytesUsed = bytesUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateDeleteTransactionId(long transactionId) {
|
|
||||||
this.deleteTransactionId = max(transactionId, deleteTransactionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a containerInfo protobuf message from ContainerReports.
|
* Gets a containerInfo protobuf message from ContainerReports.
|
||||||
*
|
*
|
||||||
@ -213,7 +200,6 @@ public ContainerInfo getProtoBufMessage() {
|
|||||||
.setWriteBytes(this.getWriteBytes())
|
.setWriteBytes(this.getWriteBytes())
|
||||||
.setFinalhash(this.getFinalhash())
|
.setFinalhash(this.getFinalhash())
|
||||||
.setContainerID(this.getContainerID())
|
.setContainerID(this.getContainerID())
|
||||||
.setDeleteTransactionId(this.deleteTransactionId)
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,117 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.container.common.helpers;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||||
|
|
||||||
|
import static java.lang.Math.max;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KeyValueContainer Report iterates the closed containers and sends a
|
||||||
|
* container report to SCM.
|
||||||
|
*/
|
||||||
|
public class KeyValueContainerReport extends ContainerReport{
|
||||||
|
private long deleteTransactionId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the KeyValueContainerReport.
|
||||||
|
*
|
||||||
|
* @param containerID - Container ID.
|
||||||
|
* @param finalhash - Final Hash.
|
||||||
|
*/
|
||||||
|
public KeyValueContainerReport(long containerID, String finalhash) {
|
||||||
|
super(containerID, finalhash);
|
||||||
|
this.deleteTransactionId = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the deleteTransactionId if it is greater than existing.
|
||||||
|
* @param transactionId - deleteTransactionId
|
||||||
|
*/
|
||||||
|
public void updateDeleteTransactionId(long transactionId) {
|
||||||
|
this.deleteTransactionId = max(transactionId, deleteTransactionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the deleteTransactionId.
|
||||||
|
* @return - deleteTransactionId.
|
||||||
|
*/
|
||||||
|
public long getDeleteTransactionId() {
|
||||||
|
return this.deleteTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a containerReport from protobuf class.
|
||||||
|
*
|
||||||
|
* @param info - ContainerInfo.
|
||||||
|
* @return - ContainerReport.
|
||||||
|
*/
|
||||||
|
public static KeyValueContainerReport getFromProtoBuf(ContainerInfo info) {
|
||||||
|
Preconditions.checkNotNull(info);
|
||||||
|
KeyValueContainerReport report = new KeyValueContainerReport(
|
||||||
|
info.getContainerID(), info.getFinalhash());
|
||||||
|
if (info.hasSize()) {
|
||||||
|
report.setSize(info.getSize());
|
||||||
|
}
|
||||||
|
if (info.hasKeyCount()) {
|
||||||
|
report.setKeyCount(info.getKeyCount());
|
||||||
|
}
|
||||||
|
if (info.hasUsed()) {
|
||||||
|
report.setBytesUsed(info.getUsed());
|
||||||
|
}
|
||||||
|
if (info.hasReadCount()) {
|
||||||
|
report.setReadCount(info.getReadCount());
|
||||||
|
}
|
||||||
|
if (info.hasReadBytes()) {
|
||||||
|
report.setReadBytes(info.getReadBytes());
|
||||||
|
}
|
||||||
|
if (info.hasWriteCount()) {
|
||||||
|
report.setWriteCount(info.getWriteCount());
|
||||||
|
}
|
||||||
|
if (info.hasWriteBytes()) {
|
||||||
|
report.setWriteBytes(info.getWriteBytes());
|
||||||
|
}
|
||||||
|
if (info.hasDeleteTransactionId()) {
|
||||||
|
report.updateDeleteTransactionId(info.getDeleteTransactionId());
|
||||||
|
}
|
||||||
|
report.setContainerID(info.getContainerID());
|
||||||
|
return report;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a containerInfo protobuf message from ContainerReports.
|
||||||
|
*
|
||||||
|
* @return ContainerInfo
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ContainerInfo getProtoBufMessage() {
|
||||||
|
return ContainerInfo.newBuilder()
|
||||||
|
.setKeyCount(this.getKeyCount())
|
||||||
|
.setSize(this.getSize())
|
||||||
|
.setUsed(this.getBytesUsed())
|
||||||
|
.setReadCount(this.getReadCount())
|
||||||
|
.setReadBytes(this.getReadBytes())
|
||||||
|
.setWriteCount(this.getWriteCount())
|
||||||
|
.setWriteBytes(this.getWriteBytes())
|
||||||
|
.setFinalhash(this.getFinalhash())
|
||||||
|
.setContainerID(this.getContainerID())
|
||||||
|
.setDeleteTransactionId(this.getDeleteTransactionId())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
@ -33,11 +33,9 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.yaml.snakeyaml.Yaml;
|
import org.yaml.snakeyaml.Yaml;
|
||||||
|
|
||||||
import static java.lang.Math.max;
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
|
import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
|
||||||
@ -81,8 +79,6 @@ public abstract class ContainerData {
|
|||||||
|
|
||||||
private HddsVolume volume;
|
private HddsVolume volume;
|
||||||
|
|
||||||
private long deleteTransactionId;
|
|
||||||
|
|
||||||
private String checksum;
|
private String checksum;
|
||||||
public static final Charset CHARSET_ENCODING = Charset.forName("UTF-8");
|
public static final Charset CHARSET_ENCODING = Charset.forName("UTF-8");
|
||||||
private static final String DUMMY_CHECKSUM = new String(new byte[64],
|
private static final String DUMMY_CHECKSUM = new String(new byte[64],
|
||||||
@ -99,12 +95,6 @@ public abstract class ContainerData {
|
|||||||
MAX_SIZE_GB,
|
MAX_SIZE_GB,
|
||||||
CHECKSUM));
|
CHECKSUM));
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Number of pending deletion blocks in container.
|
|
||||||
*/
|
|
||||||
private final AtomicInteger numPendingDeletionBlocks;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a ContainerData Object, which holds metadata of the container.
|
* Creates a ContainerData Object, which holds metadata of the container.
|
||||||
* @param type - ContainerType
|
* @param type - ContainerType
|
||||||
@ -139,8 +129,6 @@ protected ContainerData(ContainerType type, long containerId,
|
|||||||
this.bytesUsed = new AtomicLong(0L);
|
this.bytesUsed = new AtomicLong(0L);
|
||||||
this.keyCount = new AtomicLong(0L);
|
this.keyCount = new AtomicLong(0L);
|
||||||
this.maxSizeGB = size;
|
this.maxSizeGB = size;
|
||||||
this.numPendingDeletionBlocks = new AtomicInteger(0);
|
|
||||||
this.deleteTransactionId = 0;
|
|
||||||
setChecksumTo0ByteArray();
|
setChecksumTo0ByteArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,31 +391,6 @@ public void setKeyCount(long count) {
|
|||||||
this.keyCount.set(count);
|
this.keyCount.set(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the count of pending deletion blocks.
|
|
||||||
*
|
|
||||||
* @param numBlocks increment number
|
|
||||||
*/
|
|
||||||
public void incrPendingDeletionBlocks(int numBlocks) {
|
|
||||||
this.numPendingDeletionBlocks.addAndGet(numBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decrease the count of pending deletion blocks.
|
|
||||||
*
|
|
||||||
* @param numBlocks decrement number
|
|
||||||
*/
|
|
||||||
public void decrPendingDeletionBlocks(int numBlocks) {
|
|
||||||
this.numPendingDeletionBlocks.addAndGet(-1 * numBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of pending deletion blocks.
|
|
||||||
*/
|
|
||||||
public int getNumPendingDeletionBlocks() {
|
|
||||||
return this.numPendingDeletionBlocks.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setChecksumTo0ByteArray() {
|
public void setChecksumTo0ByteArray() {
|
||||||
this.checksum = DUMMY_CHECKSUM;
|
this.checksum = DUMMY_CHECKSUM;
|
||||||
}
|
}
|
||||||
@ -469,20 +432,4 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException {
|
|||||||
* @return Protocol Buffer Message
|
* @return Protocol Buffer Message
|
||||||
*/
|
*/
|
||||||
public abstract ContainerProtos.ContainerData getProtoBufMessage();
|
public abstract ContainerProtos.ContainerData getProtoBufMessage();
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets deleteTransactionId to latest delete transactionId for the container.
|
|
||||||
*
|
|
||||||
* @param transactionId latest transactionId of the container.
|
|
||||||
*/
|
|
||||||
public void updateDeleteTransactionId(long transactionId) {
|
|
||||||
deleteTransactionId = max(transactionId, deleteTransactionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the latest deleteTransactionId of the container.
|
|
||||||
*/
|
|
||||||
public long getDeleteTransactionId() {
|
|
||||||
return deleteTransactionId;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -22,9 +22,6 @@
|
|||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
@ -43,8 +40,6 @@
|
|||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
||||||
.Result.INVALID_CONTAINER_STATE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that manages Containers created on the datanode.
|
* Class that manages Containers created on the datanode.
|
||||||
@ -204,58 +199,19 @@ public ContainerReportsProto getContainerReport() throws IOException {
|
|||||||
ContainerReportsProto.Builder crBuilder =
|
ContainerReportsProto.Builder crBuilder =
|
||||||
ContainerReportsProto.newBuilder();
|
ContainerReportsProto.newBuilder();
|
||||||
|
|
||||||
|
|
||||||
for (Container container: containers) {
|
for (Container container: containers) {
|
||||||
long containerId = container.getContainerData().getContainerID();
|
crBuilder.addReports(container.getContainerReport());
|
||||||
ContainerInfo.Builder ciBuilder = ContainerInfo.newBuilder();
|
|
||||||
ContainerData containerData = container.getContainerData();
|
|
||||||
ciBuilder.setContainerID(containerId)
|
|
||||||
.setReadCount(containerData.getReadCount())
|
|
||||||
.setWriteCount(containerData.getWriteCount())
|
|
||||||
.setReadBytes(containerData.getReadBytes())
|
|
||||||
.setWriteBytes(containerData.getWriteBytes())
|
|
||||||
.setUsed(containerData.getBytesUsed())
|
|
||||||
.setState(getState(containerData))
|
|
||||||
.setDeleteTransactionId(containerData.getDeleteTransactionId());
|
|
||||||
|
|
||||||
crBuilder.addReports(ciBuilder.build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return crBuilder.build();
|
return crBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns LifeCycle State of the container.
|
|
||||||
* @param containerData - ContainerData
|
|
||||||
* @return LifeCycle State of the container
|
|
||||||
* @throws StorageContainerException
|
|
||||||
*/
|
|
||||||
private HddsProtos.LifeCycleState getState(ContainerData containerData)
|
|
||||||
throws StorageContainerException {
|
|
||||||
HddsProtos.LifeCycleState state;
|
|
||||||
switch (containerData.getState()) {
|
|
||||||
case OPEN:
|
|
||||||
state = HddsProtos.LifeCycleState.OPEN;
|
|
||||||
break;
|
|
||||||
case CLOSING:
|
|
||||||
state = HddsProtos.LifeCycleState.CLOSING;
|
|
||||||
break;
|
|
||||||
case CLOSED:
|
|
||||||
state = HddsProtos.LifeCycleState.CLOSED;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new StorageContainerException("Invalid Container state found: " +
|
|
||||||
containerData.getContainerID(), INVALID_CONTAINER_STATE);
|
|
||||||
}
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
ContainerDeletionChoosingPolicy deletionPolicy)
|
ContainerDeletionChoosingPolicy deletionPolicy)
|
||||||
throws StorageContainerException {
|
throws StorageContainerException {
|
||||||
Map<Long, ContainerData> containerDataMap = containerMap.entrySet().stream()
|
Map<Long, ContainerData> containerDataMap = containerMap.entrySet().stream()
|
||||||
.filter(e -> e.getValue().getContainerType()
|
.filter(e -> deletionPolicy.isValidContainerType(
|
||||||
== ContainerProtos.ContainerType.KeyValueContainer)
|
e.getValue().getContainerType()))
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey,
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
e -> e.getValue().getContainerData()));
|
e -> e.getValue().getContainerData()));
|
||||||
return deletionPolicy
|
return deletionPolicy
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces
|
import org.apache.hadoop.ozone.container.common.interfaces
|
||||||
.ContainerDeletionChoosingPolicy;
|
.ContainerDeletionChoosingPolicy;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -58,7 +59,7 @@ public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
|||||||
LOG.debug("Select container {} for block deletion, "
|
LOG.debug("Select container {} for block deletion, "
|
||||||
+ "pending deletion blocks num: {}.",
|
+ "pending deletion blocks num: {}.",
|
||||||
entry.getContainerID(),
|
entry.getContainerID(),
|
||||||
entry.getNumPendingDeletionBlocks());
|
((KeyValueContainerData)entry).getNumPendingDeletionBlocks());
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces
|
import org.apache.hadoop.ozone.container.common.interfaces
|
||||||
.ContainerDeletionChoosingPolicy;
|
.ContainerDeletionChoosingPolicy;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -41,14 +42,11 @@ public class TopNOrderedContainerDeletionChoosingPolicy
|
|||||||
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
|
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
|
||||||
|
|
||||||
/** customized comparator used to compare differentiate container data. **/
|
/** customized comparator used to compare differentiate container data. **/
|
||||||
private static final Comparator<ContainerData> CONTAINER_DATA_COMPARATOR
|
private static final Comparator<KeyValueContainerData>
|
||||||
= new Comparator<ContainerData>() {
|
KEY_VALUE_CONTAINER_DATA_COMPARATOR = (KeyValueContainerData c1,
|
||||||
@Override
|
KeyValueContainerData c2) ->
|
||||||
public int compare(ContainerData c1, ContainerData c2) {
|
Integer.compare(c2.getNumPendingDeletionBlocks(),
|
||||||
return Integer.compare(c2.getNumPendingDeletionBlocks(),
|
c1.getNumPendingDeletionBlocks());
|
||||||
c1.getNumPendingDeletionBlocks());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
@ -58,13 +56,15 @@ public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
|||||||
"Internal assertion: candidate containers cannot be null");
|
"Internal assertion: candidate containers cannot be null");
|
||||||
|
|
||||||
List<ContainerData> result = new LinkedList<>();
|
List<ContainerData> result = new LinkedList<>();
|
||||||
List<ContainerData> orderedList = new LinkedList<>();
|
List<KeyValueContainerData> orderedList = new LinkedList<>();
|
||||||
orderedList.addAll(candidateContainers.values());
|
for (ContainerData entry : candidateContainers.values()) {
|
||||||
Collections.sort(orderedList, CONTAINER_DATA_COMPARATOR);
|
orderedList.add((KeyValueContainerData)entry);
|
||||||
|
}
|
||||||
|
Collections.sort(orderedList, KEY_VALUE_CONTAINER_DATA_COMPARATOR);
|
||||||
|
|
||||||
// get top N list ordered by pending deletion blocks' number
|
// get top N list ordered by pending deletion blocks' number
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
for (ContainerData entry : orderedList) {
|
for (KeyValueContainerData entry : orderedList) {
|
||||||
if (currentCount < count) {
|
if (currentCount < count) {
|
||||||
if (entry.getNumPendingDeletionBlocks() > 0) {
|
if (entry.getNumPendingDeletionBlocks() > 0) {
|
||||||
result.add(entry);
|
result.add(entry);
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerLifeCycleState;
|
.ContainerLifeCycleState;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
||||||
StorageContainerException;
|
StorageContainerException;
|
||||||
|
|
||||||
@ -111,4 +112,9 @@ void update(Map<String, String> metaData, boolean forceUpdate)
|
|||||||
*/
|
*/
|
||||||
BlockIterator blockIterator() throws IOException;
|
BlockIterator blockIterator() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns containerReport for the container.
|
||||||
|
*/
|
||||||
|
StorageContainerDatanodeProtocolProtos.ContainerInfo getContainerReport()
|
||||||
|
throws StorageContainerException;
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||||
@ -42,4 +43,16 @@ public interface ContainerDeletionChoosingPolicy {
|
|||||||
List<ContainerData> chooseContainerForBlockDeletion(int count,
|
List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
Map<Long, ContainerData> candidateContainers)
|
Map<Long, ContainerData> candidateContainers)
|
||||||
throws StorageContainerException;
|
throws StorageContainerException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine if the container has suitable type for this policy.
|
||||||
|
* @param type type of the container
|
||||||
|
* @return whether the container type suitable for this policy.
|
||||||
|
*/
|
||||||
|
default boolean isValidContainerType(ContainerProtos.ContainerType type) {
|
||||||
|
if (type == ContainerProtos.ContainerType.KeyValueContainer) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
.ContainerLifeCycleState;
|
.ContainerLifeCycleState;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerType;
|
.ContainerType;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
@ -405,6 +407,50 @@ public File getContainerFile() {
|
|||||||
.getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
|
.getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns KeyValueContainerReport for the KeyValueContainer.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public StorageContainerDatanodeProtocolProtos.ContainerInfo
|
||||||
|
getContainerReport() throws StorageContainerException{
|
||||||
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||||
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||||
|
ciBuilder.setContainerID(containerData.getContainerID())
|
||||||
|
.setReadCount(containerData.getReadCount())
|
||||||
|
.setWriteCount(containerData.getWriteCount())
|
||||||
|
.setReadBytes(containerData.getReadBytes())
|
||||||
|
.setWriteBytes(containerData.getWriteBytes())
|
||||||
|
.setUsed(containerData.getBytesUsed())
|
||||||
|
.setState(getHddsState())
|
||||||
|
.setDeleteTransactionId(containerData.getDeleteTransactionId());
|
||||||
|
return ciBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns LifeCycle State of the container.
|
||||||
|
* @return LifeCycle State of the container in HddsProtos format
|
||||||
|
* @throws StorageContainerException
|
||||||
|
*/
|
||||||
|
private HddsProtos.LifeCycleState getHddsState()
|
||||||
|
throws StorageContainerException {
|
||||||
|
HddsProtos.LifeCycleState state;
|
||||||
|
switch (containerData.getState()) {
|
||||||
|
case OPEN:
|
||||||
|
state = HddsProtos.LifeCycleState.OPEN;
|
||||||
|
break;
|
||||||
|
case CLOSING:
|
||||||
|
state = HddsProtos.LifeCycleState.CLOSING;
|
||||||
|
break;
|
||||||
|
case CLOSED:
|
||||||
|
state = HddsProtos.LifeCycleState.CLOSED;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new StorageContainerException("Invalid Container state found: " +
|
||||||
|
containerData.getContainerID(), INVALID_CONTAINER_STATE);
|
||||||
|
}
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns container DB file.
|
* Returns container DB file.
|
||||||
* @return
|
* @return
|
||||||
|
@ -32,7 +32,9 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static java.lang.Math.max;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
|
import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH;
|
import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH;
|
||||||
@ -61,6 +63,13 @@ public class KeyValueContainerData extends ContainerData {
|
|||||||
|
|
||||||
private File dbFile = null;
|
private File dbFile = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of pending deletion blocks in KeyValueContainer.
|
||||||
|
*/
|
||||||
|
private final AtomicInteger numPendingDeletionBlocks;
|
||||||
|
|
||||||
|
private long deleteTransactionId;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// Initialize YAML fields
|
// Initialize YAML fields
|
||||||
KV_YAML_FIELDS = Lists.newArrayList();
|
KV_YAML_FIELDS = Lists.newArrayList();
|
||||||
@ -77,6 +86,8 @@ public class KeyValueContainerData extends ContainerData {
|
|||||||
*/
|
*/
|
||||||
public KeyValueContainerData(long id, int size) {
|
public KeyValueContainerData(long id, int size) {
|
||||||
super(ContainerProtos.ContainerType.KeyValueContainer, id, size);
|
super(ContainerProtos.ContainerType.KeyValueContainer, id, size);
|
||||||
|
this.numPendingDeletionBlocks = new AtomicInteger(0);
|
||||||
|
this.deleteTransactionId = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -88,6 +99,8 @@ public KeyValueContainerData(long id, int size) {
|
|||||||
public KeyValueContainerData(long id, int layOutVersion, int size) {
|
public KeyValueContainerData(long id, int layOutVersion, int size) {
|
||||||
super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
|
super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
|
||||||
size);
|
size);
|
||||||
|
this.numPendingDeletionBlocks = new AtomicInteger(0);
|
||||||
|
this.deleteTransactionId = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -168,6 +181,47 @@ public void setContainerDBType(String containerDBType) {
|
|||||||
this.containerDBType = containerDBType;
|
this.containerDBType = containerDBType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the count of pending deletion blocks.
|
||||||
|
*
|
||||||
|
* @param numBlocks increment number
|
||||||
|
*/
|
||||||
|
public void incrPendingDeletionBlocks(int numBlocks) {
|
||||||
|
this.numPendingDeletionBlocks.addAndGet(numBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrease the count of pending deletion blocks.
|
||||||
|
*
|
||||||
|
* @param numBlocks decrement number
|
||||||
|
*/
|
||||||
|
public void decrPendingDeletionBlocks(int numBlocks) {
|
||||||
|
this.numPendingDeletionBlocks.addAndGet(-1 * numBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of pending deletion blocks.
|
||||||
|
*/
|
||||||
|
public int getNumPendingDeletionBlocks() {
|
||||||
|
return this.numPendingDeletionBlocks.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets deleteTransactionId to latest delete transactionId for the container.
|
||||||
|
*
|
||||||
|
* @param transactionId latest transactionId of the container.
|
||||||
|
*/
|
||||||
|
public void updateDeleteTransactionId(long transactionId) {
|
||||||
|
deleteTransactionId = max(transactionId, deleteTransactionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the latest deleteTransactionId of the container.
|
||||||
|
*/
|
||||||
|
public long getDeleteTransactionId() {
|
||||||
|
return deleteTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a ProtoBuf Message from ContainerData.
|
* Returns a ProtoBuf Message from ContainerData.
|
||||||
*
|
*
|
||||||
|
@ -57,6 +57,7 @@ public void testKeyValueData() {
|
|||||||
assertEquals(val.get(), kvData.getReadCount());
|
assertEquals(val.get(), kvData.getReadCount());
|
||||||
assertEquals(val.get(), kvData.getWriteCount());
|
assertEquals(val.get(), kvData.getWriteCount());
|
||||||
assertEquals(val.get(), kvData.getKeyCount());
|
assertEquals(val.get(), kvData.getKeyCount());
|
||||||
|
assertEquals(val.get(), kvData.getNumPendingDeletionBlocks());
|
||||||
assertEquals(MAXSIZE, kvData.getMaxSizeGB());
|
assertEquals(MAXSIZE, kvData.getMaxSizeGB());
|
||||||
|
|
||||||
kvData.setState(state);
|
kvData.setState(state);
|
||||||
@ -68,6 +69,7 @@ public void testKeyValueData() {
|
|||||||
kvData.incrReadCount();
|
kvData.incrReadCount();
|
||||||
kvData.incrWriteCount();
|
kvData.incrWriteCount();
|
||||||
kvData.incrKeyCount();
|
kvData.incrKeyCount();
|
||||||
|
kvData.incrPendingDeletionBlocks(1);
|
||||||
|
|
||||||
assertEquals(state, kvData.getState());
|
assertEquals(state, kvData.getState());
|
||||||
assertEquals(containerDBType, kvData.getContainerDBType());
|
assertEquals(containerDBType, kvData.getContainerDBType());
|
||||||
@ -79,7 +81,7 @@ public void testKeyValueData() {
|
|||||||
assertEquals(1, kvData.getReadCount());
|
assertEquals(1, kvData.getReadCount());
|
||||||
assertEquals(1, kvData.getWriteCount());
|
assertEquals(1, kvData.getWriteCount());
|
||||||
assertEquals(1, kvData.getKeyCount());
|
assertEquals(1, kvData.getKeyCount());
|
||||||
|
assertEquals(1, kvData.getNumPendingDeletionBlocks());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -203,8 +203,12 @@ public void testBlockDeletion() throws Exception {
|
|||||||
MetadataStore meta = KeyUtils.getDB(
|
MetadataStore meta = KeyUtils.getDB(
|
||||||
(KeyValueContainerData) containerData.get(0), conf);
|
(KeyValueContainerData) containerData.get(0), conf);
|
||||||
Map<Long, Container> containerMap = containerSet.getContainerMap();
|
Map<Long, Container> containerMap = containerSet.getContainerMap();
|
||||||
long transactionId = containerMap.get(containerData.get(0).getContainerID())
|
// NOTE: this test assumes that all the container is KetValueContainer and
|
||||||
.getContainerData().getDeleteTransactionId();
|
// have DeleteTransactionId in KetValueContainerData. If other
|
||||||
|
// types is going to be added, this test should be checked.
|
||||||
|
long transactionId = ((KeyValueContainerData)containerMap
|
||||||
|
.get(containerData.get(0).getContainerID()).getContainerData())
|
||||||
|
.getDeleteTransactionId();
|
||||||
|
|
||||||
|
|
||||||
// Number of deleted blocks in container should be equal to 0 before
|
// Number of deleted blocks in container should be equal to 0 before
|
||||||
|
@ -139,7 +139,9 @@ public void testBlockDeletion()
|
|||||||
Assert.assertTrue(verifyBlocksCreated(omKeyLocationInfoGroupList));
|
Assert.assertTrue(verifyBlocksCreated(omKeyLocationInfoGroupList));
|
||||||
// No containers with deleted blocks
|
// No containers with deleted blocks
|
||||||
Assert.assertTrue(containerIdsWithDeletedBlocks.isEmpty());
|
Assert.assertTrue(containerIdsWithDeletedBlocks.isEmpty());
|
||||||
// Delete transactionIds for the containers should be 0
|
// Delete transactionIds for the containers should be 0.
|
||||||
|
// NOTE: this test assumes that all the container is KetValueContainer. If
|
||||||
|
// other container types is going to be added, this test should be checked.
|
||||||
matchContainerTransactionIds();
|
matchContainerTransactionIds();
|
||||||
om.deleteKey(keyArgs);
|
om.deleteKey(keyArgs);
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
@ -215,8 +217,9 @@ private void matchContainerTransactionIds() throws IOException {
|
|||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
scm.getContainerInfo(containerId).getDeleteTransactionId(), 0);
|
scm.getContainerInfo(containerId).getDeleteTransactionId(), 0);
|
||||||
}
|
}
|
||||||
Assert.assertEquals(dnContainerSet.getContainer(containerId)
|
Assert.assertEquals(((KeyValueContainerData)dnContainerSet
|
||||||
.getContainerData().getDeleteTransactionId(),
|
.getContainer(containerId).getContainerData())
|
||||||
|
.getDeleteTransactionId(),
|
||||||
scm.getContainerInfo(containerId).getDeleteTransactionId());
|
scm.getContainerInfo(containerId).getDeleteTransactionId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user