HDFS-12751. Ozone: SCM: update container allocated size to container db for all the open containers in ContainerStateManager#close. Contributed by Chen Liang.
This commit is contained in:
parent
fd09c2ce5b
commit
26e270b908
@ -100,6 +100,10 @@ public long getAllocatedBytes() {
|
|||||||
return allocatedBytes;
|
return allocatedBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAllocatedBytes(long allocatedBytes) {
|
||||||
|
this.allocatedBytes = allocatedBytes;
|
||||||
|
}
|
||||||
|
|
||||||
public long getUsedBytes() {
|
public long getUsedBytes() {
|
||||||
return usedBytes;
|
return usedBytes;
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.scm.container;
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
@ -428,8 +429,58 @@ public void close() throws IOException {
|
|||||||
if (containerLeaseManager != null) {
|
if (containerLeaseManager != null) {
|
||||||
containerLeaseManager.shutdown();
|
containerLeaseManager.shutdown();
|
||||||
}
|
}
|
||||||
|
if (containerStateManager != null) {
|
||||||
|
flushContainerInfo();
|
||||||
|
containerStateManager.close();
|
||||||
|
}
|
||||||
if (containerStore != null) {
|
if (containerStore != null) {
|
||||||
containerStore.close();
|
containerStore.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since allocatedBytes of a container is only in memory, stored in
|
||||||
|
* containerStateManager, when closing ContainerMapping, we need to update
|
||||||
|
* this in the container store.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public void flushContainerInfo() throws IOException {
|
||||||
|
List<ContainerInfo> containers = containerStateManager.getAllContainers();
|
||||||
|
List<String> failedContainers = new ArrayList<>();
|
||||||
|
for (ContainerInfo info : containers) {
|
||||||
|
// even if some container updated failed, others can still proceed
|
||||||
|
try {
|
||||||
|
byte[] dbKey = info.getContainerName().getBytes(encoding);
|
||||||
|
byte[] containerBytes = containerStore.get(dbKey);
|
||||||
|
// TODO : looks like when a container is deleted, the container is
|
||||||
|
// removed from containerStore but not containerStateManager, so it can
|
||||||
|
// return info of a deleted container. may revisit this in the future,
|
||||||
|
// for now, just skip a not-found container
|
||||||
|
if (containerBytes != null) {
|
||||||
|
OzoneProtos.SCMContainerInfo oldInfoProto =
|
||||||
|
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
|
||||||
|
ContainerInfo oldInfo = ContainerInfo.fromProtobuf(oldInfoProto);
|
||||||
|
oldInfo.setAllocatedBytes(info.getAllocatedBytes());
|
||||||
|
containerStore.put(dbKey, oldInfo.getProtobuf().toByteArray());
|
||||||
|
} else {
|
||||||
|
LOG.debug("Container state manager has container {} but not found " +
|
||||||
|
"in container store, a deleted container?",
|
||||||
|
info.getContainerName());
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
failedContainers.add(info.getContainerName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!failedContainers.isEmpty()) {
|
||||||
|
throw new IOException("Error in flushing container info from container " +
|
||||||
|
"state manager: " + failedContainers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public MetadataStore getContainerStore() {
|
||||||
|
return containerStore;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@ -222,6 +223,17 @@ private void loadExistingContainers(Mapping containerMapping) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the info of all the containers kept by the in-memory mapping.
|
||||||
|
*
|
||||||
|
* @return the list of all container info.
|
||||||
|
*/
|
||||||
|
List<ContainerInfo> getAllContainers() {
|
||||||
|
List<ContainerInfo> list = new ArrayList<>();
|
||||||
|
containers.forEach((key, value) -> list.addAll(value));
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. Client -> SCM: Begin_create
|
// 1. Client -> SCM: Begin_create
|
||||||
// 2. Client -> Datanode: create
|
// 2. Client -> Datanode: create
|
||||||
// 3. Client -> SCM: complete {SCM:Creating ->OK}
|
// 3. Client -> SCM: complete {SCM:Creating ->OK}
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for ContainerStateManager.
|
* Tests for ContainerStateManager.
|
||||||
@ -245,6 +246,43 @@ public void testUpdateContainerState() throws IOException {
|
|||||||
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
|
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
|
||||||
OzoneProtos.LifeCycleState.CLOSED).size();
|
OzoneProtos.LifeCycleState.CLOSED).size();
|
||||||
Assert.assertEquals(1, containers);
|
Assert.assertEquals(1, containers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdatingAllocatedBytes() throws Exception {
|
||||||
|
String container1 = "container" + RandomStringUtils.randomNumeric(5);
|
||||||
|
scm.allocateContainer(xceiverClientManager.getType(),
|
||||||
|
xceiverClientManager.getFactor(), container1);
|
||||||
|
scmContainerMapping.updateContainerState(container1,
|
||||||
|
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
||||||
|
scmContainerMapping.updateContainerState(container1,
|
||||||
|
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
|
||||||
|
|
||||||
|
Random ran = new Random();
|
||||||
|
long allocatedSize = 0;
|
||||||
|
for (int i = 0; i<5; i++) {
|
||||||
|
long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
|
||||||
|
allocatedSize += size;
|
||||||
|
// trigger allocating bytes by calling getMatchingContainer
|
||||||
|
ContainerInfo info = stateManager
|
||||||
|
.getMatchingContainer(size, OzoneProtos.Owner.OZONE,
|
||||||
|
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
|
||||||
|
OzoneProtos.LifeCycleState.OPEN);
|
||||||
|
Assert.assertEquals(container1, info.getContainerName());
|
||||||
|
|
||||||
|
ContainerMapping containerMapping =
|
||||||
|
(ContainerMapping)scmContainerMapping;
|
||||||
|
// manually trigger a flush, this will persist the allocated bytes value
|
||||||
|
// to disk
|
||||||
|
containerMapping.flushContainerInfo();
|
||||||
|
|
||||||
|
// the persisted value should always be equal to allocated size.
|
||||||
|
byte[] containerBytes =
|
||||||
|
containerMapping.getContainerStore().get(container1.getBytes());
|
||||||
|
OzoneProtos.SCMContainerInfo infoProto =
|
||||||
|
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
|
||||||
|
ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
|
||||||
|
Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user