diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index d2fa2c8adc..ec70dbd96e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -71,6 +71,8 @@ public abstract class ContainerData { private final long maxSize; + private boolean committedSpace; + //ID of the pipeline where this container is created private String originPipelineId; //ID of the datanode where this container is created @@ -184,7 +186,23 @@ public synchronized ContainerDataProto.State getState() { * @param state */ public synchronized void setState(ContainerDataProto.State state) { + ContainerDataProto.State oldState = this.state; this.state = state; + + if ((oldState == ContainerDataProto.State.OPEN) && + (state != oldState)) { + releaseCommitSpace(); + } + + /** + * commit space when container transitions (back) to Open. + * when? perhaps closing a container threw an exception + */ + if ((state == ContainerDataProto.State.OPEN) && + (state != oldState)) { + Preconditions.checkState(getMaxSize() > 0); + commitSpace(); + } } /** @@ -280,6 +298,41 @@ public synchronized void closeContainer() { setState(ContainerDataProto.State.CLOSED); } + private void releaseCommitSpace() { + long unused = getMaxSize() - getBytesUsed(); + + // only if container size < max size + if (unused > 0 && committedSpace) { + getVolume().incCommittedBytes(0 - unused); + } + committedSpace = false; + } + + /** + * add available space in the container to the committed space in the volume. + * available space is the number of bytes remaining till max capacity. + */ + public void commitSpace() { + long unused = getMaxSize() - getBytesUsed(); + ContainerDataProto.State myState = getState(); + HddsVolume cVol; + + //we don't expect duplicate calls + Preconditions.checkState(!committedSpace); + + // Only Open Containers have Committed Space + if (myState != ContainerDataProto.State.OPEN) { + return; + } + + // junit tests do not always set up volume + cVol = getVolume(); + if (unused > 0 && (cVol != null)) { + cVol.incCommittedBytes(unused); + committedSpace = true; + } + } + /** * Get the number of bytes read from the container. * @return the number of bytes read from the container. @@ -321,10 +374,20 @@ public long getWriteBytes() { /** * Increase the number of bytes write into the container. + * Also decrement committed bytes against the bytes written. * @param bytes the number of bytes write into the container. */ public void incrWriteBytes(long bytes) { + long unused = getMaxSize() - getBytesUsed(); + this.writeBytes.addAndGet(bytes); + + // only if container size < max size + if (committedSpace && unused > 0) { + //with this write, container size might breach max size + long decrement = Math.min(bytes, unused); + this.getVolume().incCommittedBytes(0 - decrement); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 4a7a950827..7dbcbef596 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -68,6 +68,8 @@ public boolean addContainer(Container container) throws if(containerMap.putIfAbsent(containerId, container) == null) { LOG.debug("Container with container Id {} is added to containerMap", containerId); + // wish we could have done this from ContainerData.setState + container.getContainerData().commitSpace(); return true; } else { LOG.warn("Container already exists with container Id {}", containerId); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index ab18273520..3a2034580a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; /** * HddsVolume represents volume in a datanode. {@link VolumeSet} maintains a @@ -85,6 +86,7 @@ public class HddsVolume private String datanodeUuid; // id of the DataNode private long cTime; // creation time of the file system state private int layoutVersion; // layout version of the storage data + private final AtomicLong committedBytes; // till Open containers become full /** * Run a check on the current volume to determine if it is healthy. @@ -168,6 +170,7 @@ private HddsVolume(Builder b) throws IOException { .storageType(b.storageType) .configuredCapacity(b.configuredCapacity); this.volumeInfo = volumeBuilder.build(); + this.committedBytes = new AtomicLong(0); LOG.info("Creating Volume: " + this.hddsRootDir + " of storage type : " + b.storageType + " and capacity : " + volumeInfo.getCapacity()); @@ -181,6 +184,7 @@ private HddsVolume(Builder b) throws IOException { volumeInfo = null; storageID = UUID.randomUUID().toString(); state = VolumeState.FAILED; + committedBytes = null; } } @@ -421,6 +425,23 @@ public enum VolumeState { NOT_INITIALIZED } + /** + * add "delta" bytes to committed space in the volume. + * @param delta bytes to add to committed space counter + * @return bytes of committed space + */ + public long incCommittedBytes(long delta) { + return committedBytes.addAndGet(delta); + } + + /** + * return the committed space in the volume. + * @return bytes of committed space + */ + public long getCommittedBytes() { + return committedBytes.get(); + } + /** * Only for testing. Do not use otherwise. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 838dd9ebb7..2fd169c03c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -162,6 +162,9 @@ private DispatcherContext getDispatcherContext() { private Container addContainer(ContainerSet cSet, long cID) throws IOException { + long commitBytesBefore = 0; + long commitBytesAfter = 0; + long commitIncrement = 0; KeyValueContainerData data = new KeyValueContainerData(cID, ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -169,7 +172,15 @@ private Container addContainer(ContainerSet cSet, long cID) data.addMetadata("owner)", "bilbo"); KeyValueContainer container = new KeyValueContainer(data, conf); container.create(volumeSet, volumeChoosingPolicy, SCM_ID); + commitBytesBefore = container.getContainerData() + .getVolume().getCommittedBytes(); cSet.addContainer(container); + commitBytesAfter = container.getContainerData() + .getVolume().getCommittedBytes(); + commitIncrement = commitBytesAfter - commitBytesBefore; + // did we commit space for the new container? + Assert.assertTrue(commitIncrement == + ContainerTestHelper.CONTAINER_MAX_SIZE); return container; } @@ -328,6 +339,9 @@ public void testListContainer() throws IOException { private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException { final int datalen = 1024; + long commitBytesBefore = 0; + long commitBytesAfter = 0; + long commitDecrement = 0; long testContainerID = blockID.getContainerID(); Container container = containerSet.getContainer(testContainerID); if (container == null) { @@ -337,8 +351,15 @@ private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException { blockID.getLocalID(), 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); + commitBytesBefore = container.getContainerData() + .getVolume().getCommittedBytes(); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), getDispatcherContext()); + commitBytesAfter = container.getContainerData() + .getVolume().getCommittedBytes(); + commitDecrement = commitBytesBefore - commitBytesAfter; + // did we decrement commit bytes by the amount of data we wrote? + Assert.assertTrue(commitDecrement == info.getLen()); return info; }