HDFS-12521. Ozone: SCM should read all Container info into memory when booting up. Contributed by Lokesh Jain.

This commit is contained in:
Anu Engineer 2017-10-26 13:01:13 -07:00 committed by Owen O'Malley
parent 6291ca1e32
commit 76d34bca62
15 changed files with 410 additions and 133 deletions

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
@ -100,6 +101,7 @@ public Pipeline createContainer(String containerId)
createPipeline(client, pipeline);
}
// TODO : Container Client State needs to be updated.
// TODO : Return ContainerInfo instead of Pipeline
createContainer(containerId, client, pipeline);
return pipeline;
} finally {
@ -201,6 +203,7 @@ public Pipeline createContainer(OzoneProtos.ReplicationType type,
createPipeline(client, pipeline);
}
// TODO : Return ContainerInfo instead of Pipeline
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
createContainer(containerId, client, pipeline);
@ -273,7 +276,7 @@ public void deleteContainer(Pipeline pipeline, boolean force)
* {@inheritDoc}
*/
@Override
public List<Pipeline> listContainer(String startName,
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count)
throws IOException {
return storageContainerLocationClient.listContainer(

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -71,7 +72,7 @@ public interface ScmClient {
void deleteContainer(Pipeline pipeline, boolean force) throws IOException;
/**
* Lists a range of containers and get the pipelines info.
* Lists a range of containers and get their info.
*
* @param startName start name, if null, start searching at the head.
* @param prefixName prefix name, if null, then filter is disabled.
@ -82,8 +83,8 @@ public interface ScmClient {
* @return a list of pipeline.
* @throws IOException
*/
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
List<ContainerInfo> listContainer(String startName, String prefixName,
int count) throws IOException;
/**
* Read meta data from an existing container.

View File

@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -50,7 +51,7 @@ Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
Pipeline getContainer(String containerName) throws IOException;
/**
* Ask SCM a list of pipelines with a range of container names
* Ask SCM a list of containers with a range of container names
* and the limit of count.
* Search container names between start name(exclusive), and
* use prefix name to filter the result. the max size of the
@ -62,11 +63,11 @@ Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
* Usually the count will be replace with a very big
* value instead of being unlimited in case the db is very big)
*
* @return a list of pipeline.
* @return a list of container.
* @throws IOException
*/
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
List<ContainerInfo> listContainer(String startName, String prefixName,
int count) throws IOException;
/**
* Deletes a container in SCM.

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
@ -132,7 +133,7 @@ public Pipeline getContainer(String containerName) throws IOException {
* {@inheritDoc}
*/
@Override
public List<Pipeline> listContainer(String startName, String prefixName,
public List<ContainerInfo> listContainer(String startName, String prefixName,
int count) throws IOException {
ListContainerRequestProto.Builder builder = ListContainerRequestProto
.newBuilder();
@ -148,11 +149,12 @@ public List<Pipeline> listContainer(String startName, String prefixName,
try {
ListContainerResponseProto response =
rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
List<Pipeline> pipelineList = new ArrayList<>();
for (OzoneProtos.Pipeline pipelineProto : response.getPipelineList()) {
pipelineList.add(Pipeline.getFromProtoBuf(pipelineProto));
List<ContainerInfo> containerList = new ArrayList<>();
for (OzoneProtos.SCMContainerInfo containerInfoProto : response
.getContainersList()) {
containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
}
return pipelineList;
return containerList;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -80,7 +80,7 @@ message ListContainerRequestProto {
}
message ListContainerResponseProto {
repeated hadoop.hdfs.ozone.Pipeline pipeline = 1;
repeated hadoop.hdfs.ozone.SCMContainerInfo containers = 1;
}
message DeleteContainerRequestProto {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
@ -118,12 +119,12 @@ public ListContainerResponseProto listContainer(RpcController controller,
}
count = request.getCount();
List<Pipeline> pipelineList = impl.listContainer(startName,
prefixName, count);
List<ContainerInfo> containerList =
impl.listContainer(startName, prefixName, count);
ListContainerResponseProto.Builder builder =
ListContainerResponseProto.newBuilder();
for (Pipeline pipeline : pipelineList) {
builder.addPipeline(pipeline.getProtobufMessage());
for (ContainerInfo container : containerList) {
builder.addContainers(container.getProtobuf());
}
return builder.build();
} catch (IOException e) {

View File

@ -414,7 +414,7 @@ ContainerInfo getContainerInfo(String containerName) throws IOException {
* {@inheritDoc}
*/
@Override
public List<Pipeline> listContainer(String startName,
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count) throws IOException {
return scmContainerManager.listContainer(startName, prefixName, count);
}
@ -828,6 +828,14 @@ public int getNodeCount(NodeState nodestate) {
return scmNodeManager.getNodeCount(nodestate);
}
/**
* Returns SCM container manager.
*/
@VisibleForTesting
public Mapping getScmContainerManager() {
return scmContainerManager;
}
/**
* Returns node manager.
* @return - Node Manager

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.ozone.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.ozone.web.utils.JsonUtils;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
@ -82,12 +83,12 @@ public void execute(CommandLine cmd) throws IOException {
}
}
List<Pipeline> pipelineList =
List<ContainerInfo> containerList =
getScmClient().listContainer(startName, prefixName, count);
// Output data list
for (Pipeline pipeline : pipelineList) {
outputContainerPipeline(pipeline);
for (ContainerInfo container : containerList) {
outputContainerPipeline(container.getPipeline());
}
}

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
@ -107,8 +106,8 @@ public ContainerMapping(
this.lock = new ReentrantLock();
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
this.containerStateManager = new ContainerStateManager(conf, +this
.cacheSize * OzoneConsts.MB);
this.containerStateManager =
new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
LOG.trace("Container State Manager created.");
long containerCreationLeaseTimeout = conf.getLong(
@ -144,10 +143,9 @@ public ContainerInfo getContainer(final String containerName) throws
/** {@inheritDoc} */
@Override
public List<Pipeline> listContainer(String startName, String prefixName,
int count)
throws IOException {
List<Pipeline> pipelineList = new ArrayList<>();
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count) throws IOException {
List<ContainerInfo> containerList = new ArrayList<>();
lock.lock();
try {
if (containerStore.isEmpty()) {
@ -160,7 +158,6 @@ public List<Pipeline> listContainer(String startName, String prefixName,
containerStore.getSequentialRangeKVs(startKey, count, prefixFilter);
// Transform the values into the pipelines.
// TODO: return list of ContainerInfo instead of pipelines.
// TODO: filter by container state
for (Map.Entry<byte[], byte[]> entry : range) {
ContainerInfo containerInfo =
@ -168,12 +165,12 @@ public List<Pipeline> listContainer(String startName, String prefixName,
OzoneProtos.SCMContainerInfo.PARSER.parseFrom(
entry.getValue()));
Preconditions.checkNotNull(containerInfo);
pipelineList.add(containerInfo.getPipeline());
containerList.add(containerInfo);
}
} finally {
lock.unlock();
}
return pipelineList;
return containerList;
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@ -44,9 +45,11 @@
import java.util.Queue;
import java.util.Set;
import java.util.PriorityQueue;
import java.util.List;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@ -126,7 +129,7 @@ public class ContainerStateManager {
// A map that maintains the ContainerKey to Containers of that type ordered
// by last access time.
private final Lock writeLock;
private final ReadWriteLock lock;
private final Queue<BlockContainerInfo> containerCloseQueue;
private Map<ContainerKey, PriorityQueue<BlockContainerInfo>> containers;
@ -136,8 +139,8 @@ public class ContainerStateManager {
* <p>
* TODO : Add Container Tags so we know which containers are owned by SCM.
*/
public ContainerStateManager(Configuration configuration, final long
cacheSize) throws IOException {
public ContainerStateManager(Configuration configuration,
Mapping containerMapping, final long cacheSize) throws IOException {
this.cacheSize = cacheSize;
// Initialize the container state machine.
@ -160,9 +163,10 @@ public ContainerStateManager(Configuration configuration, final long
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
writeLock = new ReentrantLock();
lock = new ReentrantReadWriteLock();
containers = new HashMap<>();
initializeContainerMaps(containers);
initializeContainerMaps();
loadExistingContainers(containerMapping);
containerCloseQueue = new ConcurrentLinkedQueue<BlockContainerInfo>();
}
@ -179,22 +183,46 @@ public ContainerStateManager(Configuration configuration, final long
* of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED} container
* states
*/
private void initializeContainerMaps(Map containerMaps) {
private void initializeContainerMaps() {
// Called only from Ctor path, hence no lock is held.
Preconditions.checkNotNull(containerMaps);
Preconditions.checkNotNull(containers);
for (OzoneProtos.Owner owner : OzoneProtos.Owner.values()) {
for (ReplicationType type : ReplicationType.values()) {
for (ReplicationFactor factor : ReplicationFactor.values()) {
for (LifeCycleState state : LifeCycleState.values()) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
PriorityQueue<BlockContainerInfo> queue = new PriorityQueue<>();
containerMaps.put(key, queue);
containers.put(key, queue);
}
}
}
}
}
/**
* Load containers from the container store into the containerMaps.
*
* @param containerMapping -- Mapping object containing container store.
*/
private void loadExistingContainers(Mapping containerMapping) {
try {
List<ContainerInfo> containerList =
containerMapping.listContainer(null, null, Integer.MAX_VALUE);
for (ContainerInfo container : containerList) {
ContainerKey key = new ContainerKey(container.getOwner(),
container.getPipeline().getType(),
container.getPipeline().getFactor(), container.getState());
BlockContainerInfo blockContainerInfo =
new BlockContainerInfo(container, 0);
((PriorityQueue) containers.get(key)).add(blockContainerInfo);
}
} catch (IOException e) {
if (!e.getMessage().equals("No container exists in current db")) {
LOG.info("Could not list the containers", e);
}
}
}
// 1. Client -> SCM: Begin_create
// 2. Client -> Datanode: create
// 3. Client -> SCM: complete {SCM:Creating ->OK}
@ -271,7 +299,7 @@ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
Preconditions.checkNotNull(info);
BlockContainerInfo blockInfo = new BlockContainerInfo(info, 0);
blockInfo.setLastUsed(Time.monotonicNow());
writeLock.lock();
lock.writeLock().lock();
try {
ContainerKey key = new ContainerKey(owner, type, replicationFactor,
blockInfo.getState());
@ -280,7 +308,7 @@ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
queue.add(blockInfo);
LOG.trace("New container allocated: {}", blockInfo);
} finally {
writeLock.unlock();
lock.writeLock().unlock();
}
return info;
}
@ -321,7 +349,7 @@ public OzoneProtos.LifeCycleState updateContainerState(BlockContainerInfo
ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
pipeline.getFactor(), newState);
writeLock.lock();
lock.writeLock().lock();
try {
PriorityQueue<BlockContainerInfo> currentQueue = containers.get(oldKey);
@ -349,7 +377,7 @@ public OzoneProtos.LifeCycleState updateContainerState(BlockContainerInfo
return newState;
} finally {
writeLock.unlock();
lock.writeLock().unlock();
}
}
@ -367,7 +395,7 @@ public BlockContainerInfo getMatchingContainer(final long size,
Owner owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
writeLock.lock();
lock.writeLock().lock();
try {
PriorityQueue<BlockContainerInfo> queue = containers.get(key);
if (queue.size() == 0) {
@ -382,7 +410,7 @@ public BlockContainerInfo getMatchingContainer(final long size,
while (iter.hasNext()) {
BlockContainerInfo info = iter.next();
if (info.getAllocated() < this.containerSize + size) {
if (info.getAllocated() + size <= this.containerSize) {
queue.remove(info);
info.addAllocated(size);
@ -399,7 +427,23 @@ public BlockContainerInfo getMatchingContainer(final long size,
}
} finally {
writeLock.unlock();
lock.writeLock().unlock();
}
return null;
}
@VisibleForTesting
public List<BlockContainerInfo> getMatchingContainers(Owner owner,
ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
ContainerKey key = new ContainerKey(owner, type, factor, state);
lock.readLock().lock();
try {
return Arrays.asList((BlockContainerInfo[]) containers.get(key)
.toArray(new BlockContainerInfo[0]));
} catch (Exception e) {
LOG.error("Could not get matching containers", e);
} finally {
lock.readLock().unlock();
}
return null;
}

View File

@ -19,7 +19,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
@ -40,7 +39,7 @@ public interface Mapping extends Closeable {
ContainerInfo getContainer(String containerName) throws IOException;
/**
* Returns pipelines under certain conditions.
* Returns containers under certain conditions.
* Search container names from start name(exclusive),
* and use prefix name to filter the result. The max
* size of the searching range cannot exceed the
@ -52,11 +51,11 @@ public interface Mapping extends Closeable {
* Usually the count will be replace with a very big
* value instead of being unlimited in case the db is very big)
*
* @return a list of pipeline.
* @return a list of container.
* @throws IOException
*/
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
List<ContainerInfo> listContainer(String startName, String prefixName,
int count) throws IOException;
/**
* Allocates a new container for a given keyName and replication factor.

View File

@ -74,6 +74,7 @@ public Pipeline getPipeline(String containerName, OzoneProtos
String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
pipeline.setContainerName(containerName);
pipeline.setPipelineName(pipelineName);
pipeline.setFactor(replicationFactor);
LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
return pipeline;
}

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.cblock.util;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
@ -68,7 +70,7 @@ public void deleteContainer(Pipeline pipeline, boolean force)
}
/**
* This is a mock class, so returns the pipelines of start container
* This is a mock class, so returns the container infos of start container
* and end container.
*
* @param startName start container name.
@ -78,11 +80,18 @@ public void deleteContainer(Pipeline pipeline, boolean force)
* @throws IOException
*/
@Override
public List<Pipeline> listContainer(String startName,
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count) throws IOException {
List<Pipeline> dataList = new ArrayList<>();
dataList.add(getContainer(startName));
return dataList;
List<ContainerInfo> containerList = new ArrayList<>();
ContainerDescriptor containerDescriptor =
ContainerLookUpService.lookUp(startName);
ContainerInfo container = new ContainerInfo.Builder()
.setContainerName(containerDescriptor.getContainerID())
.setPipeline(containerDescriptor.getPipeline())
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
.build();
containerList.add(container);
return containerList;
}
/**

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.cli.ResultCode;
import org.apache.hadoop.ozone.scm.cli.SCMCLI;
@ -34,20 +33,16 @@
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.rules.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
@ -56,11 +51,10 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertFalse;
/**
* This class tests the CLI of SCM.
*/
@Ignore("Ignoring to fix configurable pipeline, Will bring this back.")
public class TestSCMCli {
private static SCMCLI cli;
@ -70,7 +64,7 @@ public class TestSCMCli {
storageContainerLocationClient;
private static StorageContainerManager scm;
private static ContainerManager containerManager;
private static ScmClient containerOperationClient;
private static ByteArrayOutputStream outContent;
private static PrintStream outStream;
@ -86,18 +80,17 @@ public static void setup() throws Exception {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
xceiverClientManager = new XceiverClientManager(conf);
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
ScmClient client = new ContainerOperationClient(
containerOperationClient = new ContainerOperationClient(
storageContainerLocationClient, new XceiverClientManager(conf));
outContent = new ByteArrayOutputStream();
outStream = new PrintStream(outContent);
errContent = new ByteArrayOutputStream();
errStream = new PrintStream(errContent);
cli = new SCMCLI(client, outStream, errStream);
cli = new SCMCLI(containerOperationClient, outStream, errStream);
scm = cluster.getStorageContainerManager();
containerManager = cluster.getDataNodes().get(0)
.getOzoneContainerManager().getContainerManager();
}
private int runCommandAndGetOutput(String[] cmd,
@ -163,12 +156,12 @@ public void testDeleteContainer() throws Exception {
// ****************************************
// Create an non-empty container
containerName = "non-empty-container";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE,
containerName);
containerData = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, containerData);
ContainerData cdata = containerManager.readContainer(containerName);
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
ContainerData cdata = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
KeyUtils.getDB(cdata, conf).put(containerName.getBytes(),
"someKey".getBytes());
Assert.assertTrue(containerExist(containerName));
@ -184,7 +177,7 @@ public void testDeleteContainer() throws Exception {
Assert.assertTrue(containerExist(containerName));
// Close the container
containerManager.closeContainer(containerName);
containerOperationClient.closeContainer(pipeline);
// Gracefully delete a container should fail because it is not empty.
testErr = new ByteArrayOutputStream();
@ -198,31 +191,29 @@ public void testDeleteContainer() throws Exception {
delCmd = new String[] {"-container", "-delete", "-c", containerName, "-f"};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals("Expected success, found:", ResultCode.SUCCESS, exitCode);
Assert.assertFalse(containerExist(containerName));
assertFalse(containerExist(containerName));
// ****************************************
// 2. Test to delete an empty container.
// ****************************************
// Create an empty container
containerName = "empty-container";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
containerData = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, containerData);
containerManager.closeContainer(containerName);
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
containerOperationClient.closeContainer(pipeline);
Assert.assertTrue(containerExist(containerName));
// Successfully delete an empty container.
delCmd = new String[] {"-container", "-delete", "-c", containerName};
exitCode = runCommandAndGetOutput(delCmd, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
Assert.assertFalse(containerExist(containerName));
assertFalse(containerExist(containerName));
// After the container is deleted,
// a same name container can now be recreated.
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
containerManager.createContainer(pipeline, containerData);
containerOperationClient.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
Assert.assertTrue(containerExist(containerName));
// ****************************************
@ -269,10 +260,11 @@ public void testInfoContainer() throws Exception {
// Create an empty container.
cname = "ContainerTestInfo1";
Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
ContainerData data = new ContainerData(cname, conf);
containerManager.createContainer(pipeline, data);
Pipeline pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, cname);
ContainerData data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
info = new String[]{"-container", "-info", "-c", cname};
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -290,12 +282,12 @@ public void testInfoContainer() throws Exception {
// Create an non-empty container
cname = "ContainerTestInfo2";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
data = new ContainerData(cname, conf);
containerManager.createContainer(pipeline, data);
KeyUtils.getDB(data, conf).put(cname.getBytes(),
"someKey".getBytes());
pipeline = containerOperationClient
.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, cname);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
KeyUtils.getDB(data, conf).put(cname.getBytes(), "someKey".getBytes());
info = new String[]{"-container", "-info", "-c", cname};
exitCode = runCommandAndGetOutput(info, out, null);
@ -309,46 +301,20 @@ public void testInfoContainer() throws Exception {
out.reset();
// Create a container with some meta data.
cname = "ContainerTestInfo3";
pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname);
data = new ContainerData(cname, conf);
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner", "bilbo");
containerManager.createContainer(pipeline, data);
KeyUtils.getDB(data, conf).put(cname.getBytes(),
"someKey".getBytes());
List<String> metaList = data.getAllMetadata().entrySet().stream()
.map(entry -> entry.getKey() + ":" + entry.getValue())
.collect(Collectors.toList());
String metadataStr = StringUtils.join(", ", metaList);
info = new String[]{"-container", "-info", "-c", cname};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
expected = String.format(formatStr, cname, openStatus,
data.getDBPath(), data.getContainerPath(), metadataStr,
datanodeID.getHostName(), datanodeID.getHostName());
assertEquals(expected, out.toString());
out.reset();
// Close last container and test info again.
containerManager.closeContainer(cname);
containerOperationClient.closeContainer(pipeline);
info = new String[]{"-container", "-info", "-c", cname};
info = new String[] {"-container", "-info", "-c", cname};
exitCode = runCommandAndGetOutput(info, out, null);
assertEquals(ResultCode.SUCCESS, exitCode);
data = containerManager.readContainer(cname);
data = ContainerData
.getFromProtBuf(containerOperationClient.readContainer(pipeline), conf);
openStatus = data.isOpen() ? "OPEN" : "CLOSED";
expected = String.format(formatStrWithHash, cname, openStatus,
data.getHash(), data.getDBPath(), data.getContainerPath(),
metadataStr, datanodeID.getHostName(), datanodeID.getHostName());
"", datanodeID.getHostName(), datanodeID.getHostName());
assertEquals(expected, out.toString());
}
@ -376,10 +342,8 @@ public void testListContainerCommand() throws Exception {
String prefix = "ContainerForTesting";
for (int index = 0; index < 20; index++) {
String containerName = String.format("%s%02d", prefix, index);
Pipeline pipeline = scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerName);
ContainerData data = new ContainerData(containerName, conf);
containerManager.createContainer(pipeline, data);
containerOperationClient.createContainer(xceiverClientManager.getType(),
OzoneProtos.ReplicationFactor.ONE, containerName);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -517,6 +481,7 @@ public void testHelp() throws Exception {
String expected1 =
"usage: hdfs scm -container <commands> <options>\n" +
"where <commands> can be one of the following\n" +
" -close Close container\n" +
" -create Create container\n" +
" -delete Delete container\n" +
" -info Info container\n" +

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
import org.junit.*;
import org.junit.rules.ExpectedException;
import java.io.IOException;
/**
* Tests for ContainerStateManager.
*/
public class TestContainerStateManager {
private OzoneConfiguration conf;
private MiniOzoneCluster cluster;
private XceiverClientManager xceiverClientManager;
private StorageContainerManager scm;
private Mapping scmContainerMapping;
private ContainerStateManager stateManager;
@Rule
public ExpectedException thrown = ExpectedException.none();
@Before
public void setup() throws IOException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
xceiverClientManager = new XceiverClientManager(conf);
scm = cluster.getStorageContainerManager();
scmContainerMapping = scm.getScmContainerManager();
stateManager = scmContainerMapping.getStateManager();
}
@After
public void cleanUp() {
if (cluster != null) {
cluster.shutdown();
cluster.close();
}
}
@Test
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
BlockContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container1, info.getContainerName());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocated());
Assert.assertEquals(OzoneProtos.Owner.OZONE, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
info.getPipeline().getType());
Assert.assertEquals(xceiverClientManager.getFactor(),
info.getPipeline().getFactor());
Assert.assertEquals(OzoneProtos.LifeCycleState.ALLOCATED, info.getState());
// Check there are two containers in ALLOCATED state after allocation
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
int numContainers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(2, numContainers);
}
@Test
public void testContainerStateManagerRestart() throws IOException {
// Allocate 5 containers in ALLOCATED state and 5 in CREATING state
String cname = "container" + RandomStringUtils.randomNumeric(5);
for (int i = 0; i < 10; i++) {
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), cname + i);
if (i >= 5) {
scm.getScmContainerManager().updateContainerState(cname + i,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
}
}
// New instance of ContainerStateManager should load all the containers in
// container store.
ContainerStateManager stateManager =
new ContainerStateManager(conf, scmContainerMapping,
128 * OzoneConsts.MB);
int containers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(5, containers);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(5, containers);
}
@Test
public void testGetMatchingContainer() throws IOException {
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
BlockContainerInfo info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1, info.getContainerName());
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(null, info);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container2, info.getContainerName());
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
info = stateManager
.getMatchingContainer(OzoneConsts.GB * 3, OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN);
Assert.assertEquals(container2, info.getContainerName());
}
@Test
public void testUpdateContainerState() throws IOException {
int containers = stateManager
.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(0, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> DELETING -> DELETED
String container1 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container1);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
scmContainerMapping.updateContainerState(container1,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.DELETE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
scmContainerMapping
.updateContainerState(container1, OzoneProtos.LifeCycleEvent.CLEANUP);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETED).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
String container2 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container2);
scmContainerMapping.updateContainerState(container2,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
scmContainerMapping
.updateContainerState(container2, OzoneProtos.LifeCycleEvent.TIMEOUT);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSED
String container3 = "container" + RandomStringUtils.randomNumeric(5);
scm.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), container3);
scmContainerMapping.updateContainerState(container3,
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
scmContainerMapping.updateContainerState(container3,
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
scmContainerMapping
.updateContainerState(container3, OzoneProtos.LifeCycleEvent.CLOSE);
containers = stateManager.getMatchingContainers(OzoneProtos.Owner.OZONE,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
OzoneProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
}
}