HDDS-1511. Space tracking for Open Containers in HDDS Volumes. Contributed by Supratim Deka (#812)
This commit is contained in:
parent
570fa2da20
commit
9569015802
@ -71,6 +71,8 @@ public abstract class ContainerData {
|
||||
|
||||
private final long maxSize;
|
||||
|
||||
private boolean committedSpace;
|
||||
|
||||
//ID of the pipeline where this container is created
|
||||
private String originPipelineId;
|
||||
//ID of the datanode where this container is created
|
||||
@ -184,7 +186,23 @@ public synchronized ContainerDataProto.State getState() {
|
||||
* @param state
|
||||
*/
|
||||
public synchronized void setState(ContainerDataProto.State state) {
|
||||
ContainerDataProto.State oldState = this.state;
|
||||
this.state = state;
|
||||
|
||||
if ((oldState == ContainerDataProto.State.OPEN) &&
|
||||
(state != oldState)) {
|
||||
releaseCommitSpace();
|
||||
}
|
||||
|
||||
/**
|
||||
* commit space when container transitions (back) to Open.
|
||||
* when? perhaps closing a container threw an exception
|
||||
*/
|
||||
if ((state == ContainerDataProto.State.OPEN) &&
|
||||
(state != oldState)) {
|
||||
Preconditions.checkState(getMaxSize() > 0);
|
||||
commitSpace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -280,6 +298,41 @@ public synchronized void closeContainer() {
|
||||
setState(ContainerDataProto.State.CLOSED);
|
||||
}
|
||||
|
||||
private void releaseCommitSpace() {
|
||||
long unused = getMaxSize() - getBytesUsed();
|
||||
|
||||
// only if container size < max size
|
||||
if (unused > 0 && committedSpace) {
|
||||
getVolume().incCommittedBytes(0 - unused);
|
||||
}
|
||||
committedSpace = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* add available space in the container to the committed space in the volume.
|
||||
* available space is the number of bytes remaining till max capacity.
|
||||
*/
|
||||
public void commitSpace() {
|
||||
long unused = getMaxSize() - getBytesUsed();
|
||||
ContainerDataProto.State myState = getState();
|
||||
HddsVolume cVol;
|
||||
|
||||
//we don't expect duplicate calls
|
||||
Preconditions.checkState(!committedSpace);
|
||||
|
||||
// Only Open Containers have Committed Space
|
||||
if (myState != ContainerDataProto.State.OPEN) {
|
||||
return;
|
||||
}
|
||||
|
||||
// junit tests do not always set up volume
|
||||
cVol = getVolume();
|
||||
if (unused > 0 && (cVol != null)) {
|
||||
cVol.incCommittedBytes(unused);
|
||||
committedSpace = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of bytes read from the container.
|
||||
* @return the number of bytes read from the container.
|
||||
@ -321,10 +374,20 @@ public long getWriteBytes() {
|
||||
|
||||
/**
|
||||
* Increase the number of bytes write into the container.
|
||||
* Also decrement committed bytes against the bytes written.
|
||||
* @param bytes the number of bytes write into the container.
|
||||
*/
|
||||
public void incrWriteBytes(long bytes) {
|
||||
long unused = getMaxSize() - getBytesUsed();
|
||||
|
||||
this.writeBytes.addAndGet(bytes);
|
||||
|
||||
// only if container size < max size
|
||||
if (committedSpace && unused > 0) {
|
||||
//with this write, container size might breach max size
|
||||
long decrement = Math.min(bytes, unused);
|
||||
this.getVolume().incCommittedBytes(0 - decrement);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,6 +68,8 @@ public boolean addContainer(Container container) throws
|
||||
if(containerMap.putIfAbsent(containerId, container) == null) {
|
||||
LOG.debug("Container with container Id {} is added to containerMap",
|
||||
containerId);
|
||||
// wish we could have done this from ContainerData.setState
|
||||
container.getContainerData().commitSpace();
|
||||
return true;
|
||||
} else {
|
||||
LOG.warn("Container already exists with container Id {}", containerId);
|
||||
|
@ -44,6 +44,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* HddsVolume represents volume in a datanode. {@link VolumeSet} maintains a
|
||||
@ -85,6 +86,7 @@ public class HddsVolume
|
||||
private String datanodeUuid; // id of the DataNode
|
||||
private long cTime; // creation time of the file system state
|
||||
private int layoutVersion; // layout version of the storage data
|
||||
private final AtomicLong committedBytes; // till Open containers become full
|
||||
|
||||
/**
|
||||
* Run a check on the current volume to determine if it is healthy.
|
||||
@ -168,6 +170,7 @@ private HddsVolume(Builder b) throws IOException {
|
||||
.storageType(b.storageType)
|
||||
.configuredCapacity(b.configuredCapacity);
|
||||
this.volumeInfo = volumeBuilder.build();
|
||||
this.committedBytes = new AtomicLong(0);
|
||||
|
||||
LOG.info("Creating Volume: " + this.hddsRootDir + " of storage type : " +
|
||||
b.storageType + " and capacity : " + volumeInfo.getCapacity());
|
||||
@ -181,6 +184,7 @@ private HddsVolume(Builder b) throws IOException {
|
||||
volumeInfo = null;
|
||||
storageID = UUID.randomUUID().toString();
|
||||
state = VolumeState.FAILED;
|
||||
committedBytes = null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -421,6 +425,23 @@ public enum VolumeState {
|
||||
NOT_INITIALIZED
|
||||
}
|
||||
|
||||
/**
|
||||
* add "delta" bytes to committed space in the volume.
|
||||
* @param delta bytes to add to committed space counter
|
||||
* @return bytes of committed space
|
||||
*/
|
||||
public long incCommittedBytes(long delta) {
|
||||
return committedBytes.addAndGet(delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* return the committed space in the volume.
|
||||
* @return bytes of committed space
|
||||
*/
|
||||
public long getCommittedBytes() {
|
||||
return committedBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Only for testing. Do not use otherwise.
|
||||
*/
|
||||
|
@ -162,6 +162,9 @@ private DispatcherContext getDispatcherContext() {
|
||||
|
||||
private Container addContainer(ContainerSet cSet, long cID)
|
||||
throws IOException {
|
||||
long commitBytesBefore = 0;
|
||||
long commitBytesAfter = 0;
|
||||
long commitIncrement = 0;
|
||||
KeyValueContainerData data = new KeyValueContainerData(cID,
|
||||
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
|
||||
UUID.randomUUID().toString());
|
||||
@ -169,7 +172,15 @@ private Container addContainer(ContainerSet cSet, long cID)
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
KeyValueContainer container = new KeyValueContainer(data, conf);
|
||||
container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
|
||||
commitBytesBefore = container.getContainerData()
|
||||
.getVolume().getCommittedBytes();
|
||||
cSet.addContainer(container);
|
||||
commitBytesAfter = container.getContainerData()
|
||||
.getVolume().getCommittedBytes();
|
||||
commitIncrement = commitBytesAfter - commitBytesBefore;
|
||||
// did we commit space for the new container?
|
||||
Assert.assertTrue(commitIncrement ==
|
||||
ContainerTestHelper.CONTAINER_MAX_SIZE);
|
||||
return container;
|
||||
}
|
||||
|
||||
@ -328,6 +339,9 @@ public void testListContainer() throws IOException {
|
||||
|
||||
private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
|
||||
final int datalen = 1024;
|
||||
long commitBytesBefore = 0;
|
||||
long commitBytesAfter = 0;
|
||||
long commitDecrement = 0;
|
||||
long testContainerID = blockID.getContainerID();
|
||||
Container container = containerSet.getContainer(testContainerID);
|
||||
if (container == null) {
|
||||
@ -337,8 +351,15 @@ private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
|
||||
blockID.getLocalID(), 0, 0, datalen);
|
||||
byte[] data = getData(datalen);
|
||||
setDataChecksum(info, data);
|
||||
commitBytesBefore = container.getContainerData()
|
||||
.getVolume().getCommittedBytes();
|
||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
||||
getDispatcherContext());
|
||||
commitBytesAfter = container.getContainerData()
|
||||
.getVolume().getCommittedBytes();
|
||||
commitDecrement = commitBytesBefore - commitBytesAfter;
|
||||
// did we decrement commit bytes by the amount of data we wrote?
|
||||
Assert.assertTrue(commitDecrement == info.getLen());
|
||||
return info;
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user