HDDS-123:ContainerSet class to manage ContainerMap. Contributed by Bharat Viswanadham
This commit is contained in:
parent
f26d3466d7
commit
977c8cd166
@ -56,6 +56,7 @@ public class ContainerData {
|
||||
private final AtomicLong writeBytes;
|
||||
private final AtomicLong readCount;
|
||||
private final AtomicLong writeCount;
|
||||
private final AtomicLong bytesUsed;
|
||||
|
||||
|
||||
/**
|
||||
@ -73,6 +74,7 @@ public class ContainerData {
|
||||
this.readBytes = new AtomicLong(0L);
|
||||
this.writeCount = new AtomicLong(0L);
|
||||
this.writeBytes = new AtomicLong(0L);
|
||||
this.bytesUsed = new AtomicLong(0L);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -231,4 +233,40 @@ public class ContainerData {
|
||||
this.writeCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of bytes used by the container.
|
||||
* @param used
|
||||
*/
|
||||
public void setBytesUsed(long used) {
|
||||
this.bytesUsed.set(used);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of bytes used by the container.
|
||||
* @return the number of bytes used by the container.
|
||||
*/
|
||||
public long getBytesUsed() {
|
||||
return bytesUsed.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the number of bytes used by the container.
|
||||
* @param used number of bytes used by the container.
|
||||
* @return the current number of bytes used by the container afert increase.
|
||||
*/
|
||||
public long incrBytesUsed(long used) {
|
||||
return this.bytesUsed.addAndGet(used);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Decrease the number of bytes used by the container.
|
||||
* @param reclaimed the number of bytes reclaimed from the container.
|
||||
* @return the current number of bytes used by the container after decrease.
|
||||
*/
|
||||
public long decrBytesUsed(long reclaimed) {
|
||||
return this.bytesUsed.addAndGet(-1L * reclaimed);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,239 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.INVALID_CONTAINER_STATE;
|
||||
|
||||
/**
|
||||
* Class that manages Containers created on the datanode.
|
||||
*/
|
||||
public class ContainerSet {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class);
|
||||
|
||||
private final ConcurrentSkipListMap<Long, Container> containerMap = new
|
||||
ConcurrentSkipListMap<>();
|
||||
|
||||
/**
|
||||
* Add Container to container map.
|
||||
* @param container
|
||||
* @return If container is added to containerMap returns true, otherwise
|
||||
* false
|
||||
*/
|
||||
public boolean addContainer(Container container) throws
|
||||
StorageContainerException {
|
||||
Preconditions.checkNotNull(container, "container cannot be null");
|
||||
|
||||
long containerId = container.getContainerData().getContainerId();
|
||||
if(containerMap.putIfAbsent(containerId, container) == null) {
|
||||
LOG.debug("Container with container Id {} is added to containerMap",
|
||||
containerId);
|
||||
return true;
|
||||
} else {
|
||||
LOG.debug("Container already exists with container Id {}", containerId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Container with specified containerId.
|
||||
* @param containerId
|
||||
* @return Container
|
||||
*/
|
||||
public Container getContainer(long containerId) {
|
||||
Preconditions.checkState(containerId >= 0,
|
||||
"Container Id cannot be negative.");
|
||||
return containerMap.get(containerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the Container matching with specified containerId.
|
||||
* @param containerId
|
||||
* @return If container is removed from containerMap returns true, otherwise
|
||||
* false
|
||||
*/
|
||||
public boolean removeContainer(long containerId) {
|
||||
Preconditions.checkState(containerId >= 0,
|
||||
"Container Id cannot be negative.");
|
||||
Container removed = containerMap.remove(containerId);
|
||||
if(removed == null) {
|
||||
LOG.debug("Container with containerId {} is not present in " +
|
||||
"containerMap", containerId);
|
||||
return false;
|
||||
} else {
|
||||
LOG.debug("Container with containerId {} is removed from containerMap",
|
||||
containerId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return number of containers in container map.
|
||||
* @return container count
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public int containerCount() {
|
||||
return containerMap.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an container Iterator over {@link ContainerSet#containerMap}.
|
||||
* @return Iterator<Container>
|
||||
*/
|
||||
public Iterator<Container> getContainerIterator() {
|
||||
return containerMap.values().iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an containerMap iterator over {@link ContainerSet#containerMap}.
|
||||
* @return containerMap Iterator
|
||||
*/
|
||||
public Iterator<Map.Entry<Long, Container>> getContainerMapIterator() {
|
||||
return containerMap.entrySet().iterator();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A simple interface for container Iterations.
|
||||
* <p/>
|
||||
* This call make no guarantees about consistency of the data between
|
||||
* different list calls. It just returns the best known data at that point of
|
||||
* time. It is possible that using this iteration you can miss certain
|
||||
* container from the listing.
|
||||
*
|
||||
* @param startContainerId - Return containers with Id >= startContainerId.
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
public void listContainer(long startContainerId, long count,
|
||||
List<ContainerData> data) throws
|
||||
StorageContainerException {
|
||||
Preconditions.checkNotNull(data,
|
||||
"Internal assertion: data cannot be null");
|
||||
Preconditions.checkState(startContainerId >= 0,
|
||||
"Start container Id cannot be negative");
|
||||
Preconditions.checkState(count > 0,
|
||||
"max number of containers returned " +
|
||||
"must be positive");
|
||||
LOG.debug("listContainer returns containerData starting from {} of count " +
|
||||
"{}", startContainerId, count);
|
||||
ConcurrentNavigableMap<Long, Container> map;
|
||||
if (startContainerId == 0) {
|
||||
map = containerMap.tailMap(containerMap.firstKey(), true);
|
||||
} else {
|
||||
map = containerMap.tailMap(startContainerId, true);
|
||||
}
|
||||
int currentCount = 0;
|
||||
for (Container entry : map.values()) {
|
||||
if (currentCount < count) {
|
||||
data.add(entry.getContainerData());
|
||||
currentCount++;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get container report.
|
||||
*
|
||||
* @return The container report.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ContainerReportsProto getContainerReport() throws IOException {
|
||||
LOG.debug("Starting container report iteration.");
|
||||
|
||||
// No need for locking since containerMap is a ConcurrentSkipListMap
|
||||
// And we can never get the exact state since close might happen
|
||||
// after we iterate a point.
|
||||
List<Container> containers = containerMap.values().stream().collect(
|
||||
Collectors.toList());
|
||||
|
||||
ContainerReportsProto.Builder crBuilder =
|
||||
ContainerReportsProto.newBuilder();
|
||||
|
||||
|
||||
for (Container container: containers) {
|
||||
long containerId = container.getContainerData().getContainerId();
|
||||
ContainerInfo.Builder ciBuilder = ContainerInfo.newBuilder();
|
||||
ContainerData containerData = container.getContainerData();
|
||||
ciBuilder.setContainerID(containerId)
|
||||
.setReadCount(containerData.getReadCount())
|
||||
.setWriteCount(containerData.getWriteCount())
|
||||
.setReadBytes(containerData.getReadBytes())
|
||||
.setWriteBytes(containerData.getWriteBytes())
|
||||
.setUsed(containerData.getBytesUsed())
|
||||
.setState(getState(containerData));
|
||||
|
||||
crBuilder.addReports(ciBuilder.build());
|
||||
}
|
||||
|
||||
return crBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns LifeCycle State of the container.
|
||||
* @param containerData - ContainerData
|
||||
* @return LifeCycle State of the container
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
private HddsProtos.LifeCycleState getState(ContainerData containerData)
|
||||
throws StorageContainerException {
|
||||
HddsProtos.LifeCycleState state;
|
||||
switch (containerData.getState()) {
|
||||
case OPEN:
|
||||
state = HddsProtos.LifeCycleState.OPEN;
|
||||
break;
|
||||
case CLOSING:
|
||||
state = HddsProtos.LifeCycleState.CLOSING;
|
||||
break;
|
||||
case CLOSED:
|
||||
state = HddsProtos.LifeCycleState.CLOSED;
|
||||
break;
|
||||
default:
|
||||
throw new StorageContainerException("Invalid Container state found: " +
|
||||
containerData.getContainerId(), INVALID_CONTAINER_STATE);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
}
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
|
||||
|
||||
@ -40,12 +41,15 @@ public class KeyValueContainer implements Container {
|
||||
private KeyValueContainerData containerData;
|
||||
|
||||
public KeyValueContainer(KeyValueContainerData containerData) {
|
||||
Preconditions.checkNotNull(containerData, "KeyValueContainerData cannot " +
|
||||
"be null");
|
||||
this.containerData = containerData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void create(ContainerData cData) throws StorageContainerException {
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -61,8 +65,8 @@ public class KeyValueContainer implements Container {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerData getContainerData() throws StorageContainerException {
|
||||
return null;
|
||||
public ContainerData getContainerData() {
|
||||
return containerData;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,169 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Class used to test ContainerSet operations.
|
||||
*/
|
||||
public class TestContainerSet {
|
||||
|
||||
@Test
|
||||
public void testAddGetRemoveContainer() throws StorageContainerException {
|
||||
ContainerSet containerSet = new ContainerSet();
|
||||
long containerId = 100L;
|
||||
ContainerProtos.ContainerLifeCycleState state = ContainerProtos
|
||||
.ContainerLifeCycleState.CLOSED;
|
||||
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, containerId);
|
||||
kvData.setState(state);
|
||||
KeyValueContainer keyValueContainer = new KeyValueContainer(kvData);
|
||||
|
||||
//addContainer
|
||||
boolean result = containerSet.addContainer(keyValueContainer);
|
||||
assertTrue(result);
|
||||
result = containerSet.addContainer(keyValueContainer);
|
||||
assertFalse(result);
|
||||
|
||||
//getContainer
|
||||
KeyValueContainer container = (KeyValueContainer) containerSet
|
||||
.getContainer(containerId);
|
||||
KeyValueContainerData keyValueContainerData = (KeyValueContainerData)
|
||||
container.getContainerData();
|
||||
assertEquals(containerId, keyValueContainerData.getContainerId());
|
||||
assertEquals(state, keyValueContainerData.getState());
|
||||
assertNull(containerSet.getContainer(1000L));
|
||||
|
||||
//removeContainer
|
||||
assertTrue(containerSet.removeContainer(containerId));
|
||||
assertFalse(containerSet.removeContainer(1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIteratorsAndCount() throws StorageContainerException {
|
||||
|
||||
ContainerSet containerSet = createContainerSet();
|
||||
|
||||
assertEquals(10, containerSet.containerCount());
|
||||
|
||||
// Using containerIterator.
|
||||
Iterator<Container> containerIterator = containerSet.getContainerIterator();
|
||||
|
||||
int count = 0;
|
||||
while(containerIterator.hasNext()) {
|
||||
Container kv = containerIterator.next();
|
||||
ContainerData containerData = kv.getContainerData();
|
||||
long containerId = containerData.getContainerId();
|
||||
if (containerId%2 == 0) {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
containerData.getState());
|
||||
} else {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
|
||||
containerData.getState());
|
||||
}
|
||||
count++;
|
||||
}
|
||||
assertEquals(10, count);
|
||||
|
||||
//Using containerMapIterator.
|
||||
Iterator<Map.Entry<Long, Container>> containerMapIterator = containerSet
|
||||
.getContainerMapIterator();
|
||||
|
||||
count = 0;
|
||||
while (containerMapIterator.hasNext()) {
|
||||
Container kv = containerMapIterator.next().getValue();
|
||||
ContainerData containerData = kv.getContainerData();
|
||||
long containerId = containerData.getContainerId();
|
||||
if (containerId%2 == 0) {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
containerData.getState());
|
||||
} else {
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
|
||||
containerData.getState());
|
||||
}
|
||||
count++;
|
||||
}
|
||||
assertEquals(10, count);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetContainerReport() throws IOException {
|
||||
|
||||
ContainerSet containerSet = createContainerSet();
|
||||
|
||||
ContainerReportsProto containerReportsRequestProto = containerSet
|
||||
.getContainerReport();
|
||||
|
||||
assertEquals(10, containerReportsRequestProto.getReportsList().size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testListContainer() throws StorageContainerException {
|
||||
ContainerSet containerSet = createContainerSet();
|
||||
|
||||
List<ContainerData> result = new ArrayList<>();
|
||||
containerSet.listContainer(2, 5, result);
|
||||
|
||||
assertEquals(5, result.size());
|
||||
|
||||
for(ContainerData containerData : result) {
|
||||
assertTrue(containerData.getContainerId() >=2 && containerData
|
||||
.getContainerId()<=6);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerSet createContainerSet() throws StorageContainerException {
|
||||
ContainerSet containerSet = new ContainerSet();
|
||||
for (int i=0; i<10; i++) {
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, i);
|
||||
if (i%2 == 0) {
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
|
||||
} else {
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
|
||||
}
|
||||
KeyValueContainer kv = new KeyValueContainer(kvData);
|
||||
containerSet.addContainer(kv);
|
||||
}
|
||||
return containerSet;
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user