HDDS-733. Create container if not exist, as part of chunk write.

Contributed by Lokesh Jain.
This commit is contained in:
Nanda kumar 2018-11-10 00:07:26 +05:30
parent a736b5da15
commit 9fe50b4991
30 changed files with 294 additions and 492 deletions

View File

@ -117,17 +117,7 @@ public ContainerWithPipeline createContainer(String owner)
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, containerId, traceID);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
// Let us log this info after we let SCM know that we have completed the
// creation state.

View File

@ -445,13 +445,11 @@ public ContainerInfo build() {
/**
* Check if a container is in open state, this will check if the
* container is either open, allocated, creating or creating.
* Any containers in these states is managed as an open container by SCM.
* container is either open or closing state. Any containers in these states
* is managed as an open container by SCM.
*/
public boolean isOpen() {
return state == HddsProtos.LifeCycleState.ALLOCATED ||
state == HddsProtos.LifeCycleState.CREATING ||
state == HddsProtos.LifeCycleState.OPEN ||
state == HddsProtos.LifeCycleState.CLOSING;
return state == HddsProtos.LifeCycleState.OPEN
|| state == HddsProtos.LifeCycleState.CLOSING;
}
}

View File

@ -28,8 +28,6 @@
public final class AllocatedBlock {
private Pipeline pipeline;
private ContainerBlockID containerBlockID;
// Indicates whether the client should create container before writing block.
private boolean shouldCreateContainer;
/**
* Builder for AllocatedBlock.
@ -37,7 +35,6 @@ public final class AllocatedBlock {
public static class Builder {
private Pipeline pipeline;
private ContainerBlockID containerBlockID;
private boolean shouldCreateContainer;
public Builder setPipeline(Pipeline p) {
this.pipeline = p;
@ -49,22 +46,14 @@ public Builder setContainerBlockID(ContainerBlockID blockId) {
return this;
}
public Builder setShouldCreateContainer(boolean shouldCreate) {
this.shouldCreateContainer = shouldCreate;
return this;
}
public AllocatedBlock build() {
return new AllocatedBlock(pipeline, containerBlockID,
shouldCreateContainer);
return new AllocatedBlock(pipeline, containerBlockID);
}
}
private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID,
boolean shouldCreateContainer) {
private AllocatedBlock(Pipeline pipeline, ContainerBlockID containerBlockID) {
this.pipeline = pipeline;
this.containerBlockID = containerBlockID;
this.shouldCreateContainer = shouldCreateContainer;
}
public Pipeline getPipeline() {
@ -74,8 +63,4 @@ public Pipeline getPipeline() {
public ContainerBlockID getBlockID() {
return containerBlockID;
}
public boolean getCreateContainer() {
return shouldCreateContainer;
}
}

View File

@ -104,8 +104,7 @@ public AllocatedBlock allocateBlock(long size,
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setContainerBlockID(
ContainerBlockID.getFromProtobuf(response.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(response.getPipeline()))
.setShouldCreateContainer(response.getCreateContainer());
.setPipeline(Pipeline.getFromProtobuf(response.getPipeline()));
return builder.build();
}

View File

@ -78,7 +78,6 @@ public AllocateScmBlockResponseProto allocateScmBlock(
AllocateScmBlockResponseProto.newBuilder()
.setContainerBlockID(allocatedBlock.getBlockID().getProtobuf())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {

View File

@ -105,8 +105,7 @@ message AllocateScmBlockResponseProto {
required Error errorCode = 1;
optional ContainerBlockID containerBlockID = 2;
optional hadoop.hdds.Pipeline pipeline = 3;
optional bool createContainer = 4;
optional string errorMessage = 5;
optional string errorMessage = 4;
}
/**

View File

@ -111,21 +111,33 @@ public ContainerCommandResponseProto dispatch(
ContainerCommandResponseProto responseProto = null;
long startTime = System.nanoTime();
ContainerProtos.Type cmdType = msg.getCmdType();
try {
long containerID = msg.getContainerID();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
metrics.incContainerOpsMetrics(cmdType);
if (cmdType != ContainerProtos.Type.CreateContainer) {
if (cmdType != ContainerProtos.Type.CreateContainer) {
container = getContainer(containerID);
if (container == null && (cmdType == ContainerProtos.Type.WriteChunk
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
createContainer(msg);
container = getContainer(containerID);
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
return ContainerUtils.malformedRequest(msg);
}
containerType = msg.getCreateContainer().getContainerType();
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, msg);
// if container not found return error
if (container == null) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
return ContainerUtils.malformedRequest(msg);
}
containerType = msg.getCreateContainer().getContainerType();
}
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
@ -168,6 +180,32 @@ public ContainerCommandResponseProto dispatch(
}
}
/**
* Create a container using the input container request.
* @param containerRequest - the container request which requires container
* to be created.
*/
private void createContainer(ContainerCommandRequestProto containerRequest) {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto.newBuilder();
ContainerType containerType =
ContainerProtos.ContainerType.KeyValueContainer;
createRequest.setContainerType(containerType);
ContainerCommandRequestProto.Builder requestBuilder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CreateContainer)
.setContainerID(containerRequest.getContainerID())
.setCreateContainer(createRequest.build())
.setDatanodeUuid(containerRequest.getDatanodeUuid())
.setTraceID(containerRequest.getTraceID());
// TODO: Assuming the container type to be KeyValueContainer for now.
// We need to get container type from the containerRequest.
Handler handler = getHandler(containerType);
handler.handle(requestBuilder.build(), null);
}
/**
* If the container usage reaches the close threshold or the container is
* marked unhealthy we send Close ContainerAction to SCM.
@ -227,15 +265,8 @@ public void setScmId(String scmId) {
}
@VisibleForTesting
public Container getContainer(long containerID)
throws StorageContainerException {
Container container = containerSet.getContainer(containerID);
if (container == null) {
throw new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
}
return container;
public Container getContainer(long containerID) {
return containerSet.getContainer(containerID);
}
private ContainerType getContainerType(Container container) {

View File

@ -24,6 +24,7 @@
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@ -56,12 +57,12 @@ public CloseContainerCommandHandler() {
* Handles a given SCM command.
*
* @param command - SCM Command
* @param container - Ozone Container.
* @param ozoneContainer - Ozone Container.
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
public void handle(SCMCommand command, OzoneContainer container,
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.debug("Processing Close Container command.");
invocationCount++;
@ -74,8 +75,16 @@ public void handle(SCMCommand command, OzoneContainer container,
containerID = closeContainerProto.getContainerID();
// CloseContainer operation is idempotent, if the container is already
// closed, then do nothing.
if (!container.getContainerSet().getContainer(containerID)
.getContainerData().isClosed()) {
// TODO: Non-existent container should be handled properly
Container container =
ozoneContainer.getContainerSet().getContainer(containerID);
if (container == null) {
LOG.error("Container {} does not exist in datanode. "
+ "Container close failed.", containerID);
cmdExecuted = false;
return;
}
if (container.getContainerData().isClosed()) {
LOG.debug("Closing container {}.", containerID);
HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
HddsProtos.ReplicationType replicationType =
@ -91,12 +100,12 @@ public void handle(SCMCommand command, OzoneContainer container,
request.setDatanodeUuid(
context.getParent().getDatanodeDetails().getUuidString());
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
ozoneContainer.submitContainerRequest(
request.build(), replicationType, pipelineID);
// Since the container is closed, we trigger an ICR
IncrementalContainerReportProto icr = IncrementalContainerReportProto
.newBuilder()
.addReport(container.getContainerSet()
.addReport(ozoneContainer.getContainerSet()
.getContainer(containerID).getContainerReport())
.build();
context.addReport(icr);

View File

@ -100,6 +100,43 @@ public void testContainerCloseActionWhenFull() throws IOException {
}
@Test
public void testCreateContainerWithWriteChunk() throws IOException {
String testDir =
GenericTestUtils.getTempPath(TestHddsDispatcher.class.getSimpleName());
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
StateContext context = Mockito.mock(StateContext.class);
HddsDispatcher hddsDispatcher =
new HddsDispatcher(conf, containerSet, volumeSet, context);
hddsDispatcher.setScmId(scmId.toString());
ContainerCommandRequestProto writeChunkRequest =
getWriteChunkRequest(dd.getUuidString(), 1L, 1L);
// send read chunk request and make sure container does not exist
ContainerCommandResponseProto response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest));
Assert.assertEquals(response.getResult(),
ContainerProtos.Result.CONTAINER_NOT_FOUND);
// send write chunk request without sending create container
response = hddsDispatcher.dispatch(writeChunkRequest);
// container should be created as part of write chunk request
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
// send read chunk request to read the chunk written above
response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest));
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(response.getReadChunk().getData(),
writeChunkRequest.getWriteChunk().getData());
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}
// This method has to be removed once we move scm/TestUtils.java
// from server-scm project to container-service or to common project.
private static DatanodeDetails randomDatanodeDetails() {
@ -150,4 +187,27 @@ private ContainerCommandRequestProto getWriteChunkRequest(
.build();
}
/**
* Creates container read chunk request using input container write chunk
* request.
*
* @param writeChunkRequest - Input container write chunk request
* @return container read chunk request
*/
private ContainerCommandRequestProto getReadChunkRequest(
ContainerCommandRequestProto writeChunkRequest) {
WriteChunkRequestProto writeChunk = writeChunkRequest.getWriteChunk();
ContainerProtos.ReadChunkRequestProto.Builder readChunkRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunk.getBlockID())
.setChunkData(writeChunk.getChunkData());
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.ReadChunk)
.setContainerID(writeChunk.getBlockID().getContainerID())
.setTraceID(writeChunkRequest.getTraceID())
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setReadChunk(readChunkRequest)
.build();
}
}

View File

@ -197,18 +197,10 @@ public AllocatedBlock allocateBlock(final long size,
/*
Here is the high level logic.
1. First we check if there are containers in ALLOCATED state, that is
SCM has allocated them in the SCM namespace but the corresponding
container has not been created in the Datanode yet. If we have any in
that state, we will return that to the client, which allows client to
finish creating those containers. This is a sort of greedy algorithm,
our primary purpose is to get as many containers as possible.
1. We try to find containers in open state.
2. If there are no allocated containers -- Then we find a Open container
that matches that pattern.
3. If both of them fail, the we will pre-allocate a bunch of containers
in SCM and try again.
2. If there are no containers in open state, then we will pre-allocate a
bunch of containers in SCM and try again.
TODO : Support random picking of two containers from the list. So we can
use different kind of policies.
@ -216,78 +208,42 @@ public AllocatedBlock allocateBlock(final long size,
ContainerWithPipeline containerWithPipeline;
// This is to optimize performance, if the below condition is evaluated
// to false, then we can be sure that there are no containers in
// ALLOCATED state.
// This can result in false positive, but it will never be false negative.
// How can this result in false positive? We check if there are any
// containers in ALLOCATED state, this check doesn't care about the
// USER of the containers. So there might be cases where a different
// USER has few containers in ALLOCATED state, which will result in
// false positive.
if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED)
.isEmpty()) {
// Since the above check can result in false positive, we have to do
// the actual check and find out if there are containers in ALLOCATED
// state matching our criteria.
synchronized (this) {
// Using containers from ALLOCATED state should be done within
// synchronized block (or) write lock. Since we already hold a
// read lock, we will end up in deadlock situation if we take
// write lock here.
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
}
}
}
// Since we found no allocated containers that match our criteria, let us
// look for OPEN containers that match the criteria.
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
if (containerWithPipeline != null) {
return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
}
// We found neither ALLOCATED or OPEN Containers. This generally means
// We did not find OPEN Containers. This generally means
// that most of our containers are full or we have not allocated
// containers of the type and replication factor. So let us go and
// allocate some.
// Even though we have already checked the containers in ALLOCATED
// Even though we have already checked the containers in OPEN
// state, we have to check again as we only hold a read lock.
// Some other thread might have pre-allocated container in meantime.
synchronized (this) {
if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED)
.isEmpty()) {
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}
if (containerWithPipeline == null) {
preAllocateContainers(containerProvisionBatchSize,
type, factor, owner);
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.ALLOCATED);
}
if (containerWithPipeline == null) {
synchronized (this) {
if (!containerManager.getContainers(HddsProtos.LifeCycleState.OPEN)
.isEmpty()) {
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
}
if (containerWithPipeline != null) {
containerManager.updateContainerState(
containerWithPipeline.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
return newBlock(containerWithPipeline,
HddsProtos.LifeCycleState.ALLOCATED);
if (containerWithPipeline == null) {
preAllocateContainers(containerProvisionBatchSize, type, factor,
owner);
containerWithPipeline = containerManager
.getMatchingContainerWithPipeline(size, owner, type, factor,
HddsProtos.LifeCycleState.OPEN);
}
}
}
if (containerWithPipeline != null) {
return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN);
}
// we have tried all strategies we know and but somehow we are not able
// to get a container for this block. Log that info and return a null.
LOG.error(
@ -315,13 +271,10 @@ private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline,
long localID = UniqueId.next();
long containerID = containerInfo.getContainerID();
boolean createContainer = (state == HddsProtos.LifeCycleState.ALLOCATED);
AllocatedBlock.Builder abb =
new AllocatedBlock.Builder()
.setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(containerWithPipeline.getPipeline())
.setShouldCreateContainer(createContainer);
.setPipeline(containerWithPipeline.getPipeline());
LOG.trace("New block allocated : {} Container ID: {}", localID,
containerID);
return abb.build();

View File

@ -54,12 +54,11 @@ public ContainerChillModeRule(Configuration conf,
containerMap = new ConcurrentHashMap<>();
if(containers != null) {
containers.forEach(c -> {
// Containers in ALLOCATED state should not be included while
// calculating the total number of containers here. They are not
// reported by DNs and hence should not affect the chill mode exit
// rule.
// TODO: There can be containers in OPEN state which were never
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.
if (c != null && c.getState() != null &&
!c.getState().equals(HddsProtos.LifeCycleState.ALLOCATED)) {
!c.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
containerMap.put(c.getContainerID(), c);
}
});

View File

@ -74,19 +74,6 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
HddsProtos.LifeCycleState state = info.getState();
try {
switch (state) {
case ALLOCATED:
// We cannot close a container in ALLOCATED state, moving the
// container to CREATING state, this should eventually
// timeout and the container will be moved to DELETING state.
LOG.debug("Closing container #{} in {} state", containerID, state);
containerManager.updateContainerState(containerID,
HddsProtos.LifeCycleEvent.CREATE);
break;
case CREATING:
// We cannot close a container in CREATING state, it will eventually
// timeout and moved to DELETING state.
LOG.debug("Closing container {} in {} state", containerID, state);
break;
case OPEN:
containerManager.updateContainerState(containerID,
HddsProtos.LifeCycleEvent.FINALIZE);

View File

@ -138,7 +138,7 @@ public ContainerStateManager(final Configuration configuration) {
finalStates.add(LifeCycleState.CLOSED);
finalStates.add(LifeCycleState.DELETED);
this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
this.stateMachine = new StateMachine<>(LifeCycleState.OPEN,
finalStates);
initializeStateMachine();
@ -156,12 +156,6 @@ public ContainerStateManager(final Configuration configuration) {
*
* Event and State Transition Mapping:
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
@ -174,34 +168,20 @@ public ContainerStateManager(final Configuration configuration) {
* State: DELETING ----------------> DELETED
* Event: CLEANUP
*
* State: CREATING ---------------> DELETING
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]------->[CLOSED]
* (CREATE) | (CREATED) (FINALIZE) (CLOSE) |
* | |
* | |
* |(TIMEOUT) (DELETE)|
* | |
* +-------------> [DELETING] <-------------------+
* |
* |
* (CLEANUP)|
* |
* [DELETED]
* [OPEN]-------->[CLOSING]------->[CLOSED]
* (FINALIZE) (CLOSE) |
* |
* |
* (DELETE)|
* |
* |
* [DELETING] ----------> [DELETED]
* (CLEANUP)
*/
private void initializeStateMachine() {
stateMachine.addTransition(LifeCycleState.ALLOCATED,
LifeCycleState.CREATING,
LifeCycleEvent.CREATE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.OPEN,
LifeCycleEvent.CREATED);
stateMachine.addTransition(LifeCycleState.OPEN,
LifeCycleState.CLOSING,
LifeCycleEvent.FINALIZE);
@ -214,10 +194,6 @@ private void initializeStateMachine() {
LifeCycleState.DELETING,
LifeCycleEvent.DELETE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.DELETING,
LifeCycleEvent.TIMEOUT);
stateMachine.addTransition(LifeCycleState.DELETING,
LifeCycleState.DELETED,
LifeCycleEvent.CLEANUP);
@ -262,7 +238,7 @@ ContainerInfo allocateContainer(final PipelineManager pipelineManager,
final long containerID = containerCount.incrementAndGet();
final ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setState(LifeCycleState.OPEN)
.setPipelineID(pipeline.getId())
.setUsedBytes(0)
.setNumberOfKeys(0)

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@ -37,9 +36,6 @@
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
@ -54,7 +50,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -82,7 +77,6 @@ public class SCMContainerManager implements ContainerManager {
private final MetadataStore containerStore;
private final PipelineManager pipelineManager;
private final ContainerStateManager containerStateManager;
private final LeaseManager<ContainerInfo> containerLeaseManager;
private final EventPublisher eventPublisher;
private final long size;
@ -122,14 +116,6 @@ public SCMContainerManager(final Configuration conf,
this.containerStateManager = new ContainerStateManager(conf);
this.eventPublisher = eventPublisher;
final long containerCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
this.containerLeaseManager = new LeaseManager<>("ContainerCreation",
containerCreationLeaseTimeout);
this.containerLeaseManager.start();
loadExistingContainers();
}
@ -371,51 +357,31 @@ public HddsProtos.LifeCycleState updateContainerState(
private ContainerInfo updateContainerStateInternal(ContainerID containerID,
HddsProtos.LifeCycleEvent event) throws IOException {
// Refactor the below code for better clarity.
try {
final ContainerInfo info =
containerStateManager.getContainer(containerID);
switch (event) {
case CREATE:
// Acquire lease on container
Lease<ContainerInfo> containerLease =
containerLeaseManager.acquire(info);
// Register callback to be executed in case of timeout
containerLease.registerCallBack(() -> {
updateContainerState(containerID,
HddsProtos.LifeCycleEvent.TIMEOUT);
return null; });
break;
case CREATED:
// Release the lease on container
containerLeaseManager.release(info);
break;
case FINALIZE:
// TODO: we don't need a lease manager here for closing as the
// container report will include the container state after HDFS-13008
// If a client failed to update the container close state, DN container
// report from 3 DNs will be used to close the container eventually.
break;
case CLOSE:
break;
case UPDATE:
break;
case DELETE:
break;
case TIMEOUT:
break;
case CLEANUP:
break;
default:
throw new SCMException("Unsupported container LifeCycleEvent.",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// If the below updateContainerState call fails, we should revert the
// changes made in switch case.
// Like releasing the lease in case of BEGIN_CREATE.
return containerStateManager.updateContainerState(containerID, event);
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
switch (event) {
case FINALIZE:
// TODO: we don't need a lease manager here for closing as the
// container report will include the container state after HDFS-13008
// If a client failed to update the container close state, DN container
// report from 3 DNs will be used to close the container eventually.
break;
case CLOSE:
break;
case UPDATE:
break;
case DELETE:
break;
case TIMEOUT:
break;
case CLEANUP:
break;
default:
throw new SCMException("Unsupported container LifeCycleEvent.",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
// If the below updateContainerState call fails, we should revert the
// changes made in switch case.
// Like releasing the lease in case of BEGIN_CREATE.
return containerStateManager.updateContainerState(containerID, event);
}
@ -533,9 +499,6 @@ public void removeContainerReplica(final ContainerID containerID,
*/
@Override
public void close() throws IOException {
if (containerLeaseManager != null) {
containerLeaseManager.shutdown();
}
if (containerStateManager != null) {
containerStateManager.close();
}

View File

@ -257,30 +257,16 @@ public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
if (type == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Type.container) {
if (op == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Op.create) {
.ObjectStageChangeRequestProto.Op.close) {
if (stage == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Stage.begin) {
scm.getContainerManager().updateContainerState(
ContainerID.valueof(id), HddsProtos
.LifeCycleEvent.CREATE);
scm.getContainerManager()
.updateContainerState(ContainerID.valueof(id),
HddsProtos.LifeCycleEvent.FINALIZE);
} else {
scm.getContainerManager().updateContainerState(
ContainerID.valueof(id), HddsProtos
.LifeCycleEvent.CREATED);
}
} else {
if (op == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Op.close) {
if (stage == StorageContainerLocationProtocolProtos
.ObjectStageChangeRequestProto.Stage.begin) {
scm.getContainerManager().updateContainerState(
ContainerID.valueof(id), HddsProtos
.LifeCycleEvent.FINALIZE);
} else {
scm.getContainerManager().updateContainerState(
ContainerID.valueof(id), HddsProtos
.LifeCycleEvent.CLOSE);
}
scm.getContainerManager()
.updateContainerState(ContainerID.valueof(id),
HddsProtos.LifeCycleEvent.CLOSE);
}
}
} // else if (type == ObjectStageChangeRequestProto.Type.pipeline) {

View File

@ -94,7 +94,7 @@ public void testChillModeExitRule() throws Exception {
// Assign open state to containers to be included in the chill mode
// container list
for (ContainerInfo container : containers) {
container.setState(HddsProtos.LifeCycleState.OPEN);
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
scmChillModeManager = new SCMChillModeManager(config, containers, queue);
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
@ -140,13 +140,13 @@ public void testContainerChillModeRule() throws Exception {
containers = new ArrayList<>();
// Add 100 containers to the list of containers in SCM
containers.addAll(HddsTestUtils.getContainerInfo(25 * 4));
// Assign OPEN state to first 25 containers and ALLLOCATED state to rest
// Assign CLOSED state to first 25 containers and OPEM state to rest
// of the containers
for (ContainerInfo container : containers.subList(0, 25)) {
container.setState(HddsProtos.LifeCycleState.OPEN);
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
for (ContainerInfo container : containers.subList(25, 100)) {
container.setState(HddsProtos.LifeCycleState.ALLOCATED);
container.setState(HddsProtos.LifeCycleState.OPEN);
}
scmChillModeManager = new SCMChillModeManager(config, containers, queue);
@ -154,9 +154,9 @@ public void testContainerChillModeRule() throws Exception {
scmChillModeManager);
assertTrue(scmChillModeManager.getInChillMode());
// When 10 OPEN containers are reported by DNs, the computed container
// threshold should be 10/25 as there are only 25 open containers.
// Containers in ALLOCATED state should not contribute towards list of
// When 10 CLOSED containers are reported by DNs, the computed container
// threshold should be 10/25 as there are only 25 CLOSED containers.
// Containers in OPEN state should not contribute towards list of
// containers while calculating container threshold in SCMChillNodeManager
testContainerThreshold(containers.subList(0, 10), 0.4);
assertTrue(scmChillModeManager.getInChillMode());

View File

@ -39,7 +39,6 @@
import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATED;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
@ -112,8 +111,6 @@ public void testCloseContainerEventWithInvalidContainer() {
@Test
public void testCloseContainerEventWithValidContainers() throws IOException {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
ContainerWithPipeline containerWithPipeline = containerManager
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, "ozone");
@ -124,16 +121,6 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
int closeCount = nodeManager.getCommandCount(datanode);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);
// At this point of time, the allocated container is not in open
// state, so firing close container event should not queue CLOSE
// command in the Datanode
Assert.assertEquals(0, nodeManager.getCommandCount(datanode));
//Execute these state transitions so that we can close the container.
containerManager.updateContainerState(id, CREATED);
eventQueue.fireEvent(CLOSE_CONTAINER,
new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID()));
eventQueue.processAll(1000);
Assert.assertEquals(closeCount + 1,
nodeManager.getCommandCount(datanode));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
@ -165,8 +152,6 @@ public void testCloseContainerEventWithRatis() throws IOException {
Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
i++;
}
//Execute these state transitions so that we can close the container.
containerManager.updateContainerState(id, CREATED);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);
i = 0;

View File

@ -167,10 +167,6 @@ public void testGetContainerWithPipeline() throws Exception {
.setHostName("host2")
.setIpAddress("2.2.2.2")
.setUuid(UUID.randomUUID().toString()).build();
containerManager
.updateContainerState(contInfo.containerID(), LifeCycleEvent.CREATE);
containerManager.updateContainerState(contInfo.containerID(),
LifeCycleEvent.CREATED);
containerManager.updateContainerState(contInfo.containerID(),
LifeCycleEvent.FINALIZE);
containerManager
@ -218,26 +214,6 @@ public void testgetNoneExistentContainer() {
}
}
@Test
public void testContainerCreationLeaseTimeout() throws IOException,
InterruptedException {
nodeManager.setChillmode(false);
ContainerWithPipeline containerInfo = containerManager.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(),
containerOwner);
containerManager.updateContainerState(containerInfo.getContainerInfo()
.containerID(), HddsProtos.LifeCycleEvent.CREATE);
Thread.sleep(TIMEOUT + 1000);
thrown.expect(IOException.class);
thrown.expectMessage("Lease Exception");
containerManager
.updateContainerState(containerInfo.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATED);
}
@Test
public void testCloseContainer() throws IOException {
ContainerID id = createContainer().containerID();
@ -260,10 +236,6 @@ private ContainerInfo createContainer()
.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
containerManager.updateContainerState(containerInfo.containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(containerInfo.containerID(),
HddsProtos.LifeCycleEvent.CREATED);
return containerInfo;
}

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
@ -142,21 +141,6 @@ public void testOnMessage() throws IOException, NodeNotFoundException {
ContainerInfo container3 =
TestUtils.allocateContainer(containerManager);
containerManager.updateContainerState(
container1.containerID(), HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(
container1.containerID(), HddsProtos.LifeCycleEvent.CREATED);
containerManager.updateContainerState(
container2.containerID(), HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(
container2.containerID(), HddsProtos.LifeCycleEvent.CREATED);
containerManager.updateContainerState(
container3.containerID(), HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(
container3.containerID(), HddsProtos.LifeCycleEvent.CREATED);
registerReplicas(datanode1, container1, container2);
registerReplicas(datanode2, container1, container3);
@ -268,10 +252,6 @@ public void testOnMessageReplicaFailure() throws Exception {
ContainerInfo container1 =
TestUtils.allocateContainer(containerManager);
containerManager.updateContainerState(
container1.containerID(), HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(
container1.containerID(), HddsProtos.LifeCycleEvent.CREATED);
TestUtils.closeContainer(containerManager, container1.containerID());
deadNodeHandler.onMessage(dn1, eventQueue);

View File

@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -29,7 +28,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -126,7 +124,6 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
for (ChunkOutputStreamEntry streamEntry : streamEntries) {
OmKeyLocationInfo info =
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
.setShouldCreateContainer(false)
.setLength(streamEntry.currentPosition).setOffset(0)
.build();
locationInfoList.add(info);
@ -180,41 +177,17 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
// equals to open session version)
for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
if (subKeyInfo.getCreateVersion() == openVersion) {
checkKeyLocationInfo(subKeyInfo);
addKeyLocationInfo(subKeyInfo);
}
}
}
private void checkKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
throws IOException {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerWithPipeline(subKeyInfo.getContainerID());
ContainerInfo container = containerWithPipeline.getContainerInfo();
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
// create container if needed
if (subKeyInfo.getShouldCreateContainer()) {
try {
ContainerProtocolCalls.createContainer(xceiverClient,
container.getContainerID(), requestID);
scmClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
subKeyInfo.getContainerID(),
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist, this should never happen
LOG.debug("Container {} already exists.",
container.getContainerID());
} else {
LOG.error("Container creation failed for {}.",
container.getContainerID(), ex);
throw ex;
}
}
}
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength()));
@ -479,7 +452,7 @@ private long getKeyLength() {
*/
private void allocateNewBlock(int index) throws IOException {
OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID);
checkKeyLocationInfo(subKeyInfo);
addKeyLocationInfo(subKeyInfo);
}
@Override

View File

@ -25,17 +25,14 @@
*/
public final class OmKeyLocationInfo {
private final BlockID blockID;
private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private long length;
private final long offset;
// the version number indicating when this block was added
private long createVersion;
private OmKeyLocationInfo(BlockID blockID, boolean shouldCreateContainer,
long length, long offset) {
private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
this.blockID = blockID;
this.shouldCreateContainer = shouldCreateContainer;
this.length = length;
this.offset = offset;
}
@ -60,10 +57,6 @@ public long getLocalID() {
return blockID.getLocalID();
}
public boolean getShouldCreateContainer() {
return shouldCreateContainer;
}
public long getLength() {
return length;
}
@ -85,7 +78,6 @@ public long getBlockCommitSequenceId() {
*/
public static class Builder {
private BlockID blockID;
private boolean shouldCreateContainer;
private long length;
private long offset;
@ -94,11 +86,6 @@ public Builder setBlockID(BlockID blockId) {
return this;
}
public Builder setShouldCreateContainer(boolean create) {
this.shouldCreateContainer = create;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
@ -110,15 +97,13 @@ public Builder setOffset(long off) {
}
public OmKeyLocationInfo build() {
return new OmKeyLocationInfo(blockID,
shouldCreateContainer, length, offset);
return new OmKeyLocationInfo(blockID, length, offset);
}
}
public KeyLocation getProtobuf() {
return KeyLocation.newBuilder()
.setBlockID(blockID.getProtobuf())
.setShouldCreateContainer(shouldCreateContainer)
.setLength(length)
.setOffset(offset)
.setCreateVersion(createVersion)
@ -128,7 +113,6 @@ public KeyLocation getProtobuf() {
public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
OmKeyLocationInfo info = new OmKeyLocationInfo(
BlockID.getFromProtobuf(keyLocation.getBlockID()),
keyLocation.getShouldCreateContainer(),
keyLocation.getLength(),
keyLocation.getOffset());
info.setCreateVersion(keyLocation.getCreateVersion());
@ -139,7 +123,6 @@ public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
public String toString() {
return "{blockID={containerID=" + blockID.getContainerID() +
", localID=" + blockID.getLocalID() + "}" +
", shouldCreateContainer=" + shouldCreateContainer +
", length=" + length +
", offset=" + offset +
", createVersion=" + createVersion + '}';

View File

@ -247,7 +247,6 @@ message KeyArgs {
message KeyLocation {
required hadoop.hdds.BlockID blockID = 1;
required bool shouldCreateContainer = 2;
required uint64 offset = 3;
required uint64 length = 4;
// indicated at which version this block gets created.

View File

@ -77,14 +77,12 @@ public void cleanUp() {
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerStateManager stateManager = new ContainerStateManager(conf);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
Assert.assertEquals(containerOwner, info.getOwner());
@ -92,7 +90,7 @@ public void testAllocateContainer() throws IOException {
info.getReplicationType());
Assert.assertEquals(xceiverClientManager.getFactor(),
info.getReplicationFactor());
Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, info.getState());
// Check there are two containers in ALLOCATED state after allocation
ContainerWithPipeline container2 = scm.getClientProtocolServer()
@ -102,7 +100,7 @@ public void testAllocateContainer() throws IOException {
int numContainers = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
HddsProtos.LifeCycleState.OPEN).size();
Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(2, numContainers);
@ -122,7 +120,7 @@ public void testContainerStateManagerRestart()
if (i >= 5) {
scm.getContainerManager().updateContainerState(container
.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
HddsProtos.LifeCycleEvent.FINALIZE);
}
}
@ -139,7 +137,7 @@ public void testContainerStateManagerRestart()
.filter(info ->
info.getReplicationFactor() == xceiverClientManager.getFactor())
.filter(info ->
info.getState() == HddsProtos.LifeCycleState.ALLOCATED)
info.getState() == HddsProtos.LifeCycleState.OPEN)
.count();
Assert.assertEquals(5, matchCount);
matchCount = result.stream()
@ -150,7 +148,7 @@ public void testContainerStateManagerRestart()
.filter(info ->
info.getReplicationFactor() == xceiverClientManager.getFactor())
.filter(info ->
info.getState() == HddsProtos.LifeCycleState.CREATING)
info.getState() == HddsProtos.LifeCycleState.CLOSING)
.count();
Assert.assertEquals(5, matchCount);
}
@ -160,16 +158,6 @@ public void testGetMatchingContainer() throws IOException {
ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container1.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container1.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATED);
ContainerWithPipeline container2 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
@ -178,22 +166,18 @@ public void testGetMatchingContainer() throws IOException {
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
ContainerWithPipeline container2 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
HddsProtos.LifeCycleState.OPEN);
// space has already been allocated in container1, now container 2 should
// be chosen.
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
containerManager
.updateContainerState(container2.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container2.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATED);
// now we have to get container1
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
@ -208,32 +192,16 @@ public void testUpdateContainerState() throws IOException {
NavigableSet<ContainerID> containerList = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
HddsProtos.LifeCycleState.OPEN);
int containers = containerList == null ? 0 : containerList.size();
Assert.assertEquals(0, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// Allocate container1 and update its state from
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATED);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN).size();
@ -271,35 +239,12 @@ public void testUpdateContainerState() throws IOException {
HddsProtos.LifeCycleState.DELETED).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
ContainerWithPipeline container2 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container2.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container2.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.TIMEOUT);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// Allocate container1 and update its state from
// OPEN -> CLOSING -> CLOSED
ContainerWithPipeline container3 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container3.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container3.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.CREATED);
containerManager
.updateContainerState(container3.getContainerInfo().containerID(),
HddsProtos.LifeCycleEvent.FINALIZE);

View File

@ -101,10 +101,6 @@ public void testPipelineMap() throws IOException {
// Now close the container and it should not show up while fetching
// containers by pipeline
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
containerManager

View File

@ -94,10 +94,6 @@ public void testPipelineCloseWithClosedContainer() throws IOException {
// Now close the container and it should not show up while fetching
// containers by pipeline
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
containerManager
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
containerManager
@ -128,10 +124,6 @@ public void testPipelineCloseWithOpenContainer() throws IOException,
Assert.assertEquals(1, setOpen.size());
ContainerID cId2 = ratisContainer2.getContainerInfo().containerID();
containerManager
.updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED);
pipelineManager.finalizePipeline(ratisContainer2.getPipeline().getId());
Assert.assertEquals(
pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
@ -49,6 +52,7 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.event.Level;
@ -58,6 +62,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* Tests Close Container Exception handling by Ozone Client.
@ -207,9 +212,9 @@ public void testMultiBlockWrites() throws Exception {
createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) key.getOutputStream();
// With the initial size provided, it should have preallocated 3 blocks
// With the initial size provided, it should have preallocated 4 blocks
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
// write data more than 1 chunk
// write data for 3 blocks and 1 more chunk
byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
Assert.assertEquals(data.length, 3 * blockSize);
key.write(data);
@ -257,7 +262,8 @@ public void testMultiBlockWrites2() throws Exception {
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
// With the initial size provided, it should have pre allocated 4 blocks
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
String dataString = fixedLengthString(keyString, (3 * blockSize));
String dataString =
fixedLengthString(keyString, (3 * blockSize + chunkSize));
byte[] data = dataString.getBytes();
key.write(data);
// 3 block are completely written to the DataNode in 3 blocks.
@ -283,8 +289,8 @@ public void testMultiBlockWrites2() throws Exception {
// closeContainerException and remaining data in the chunkOutputStream
// buffer will be copied into a different allocated block and will be
// committed.
Assert.assertEquals(4, keyLocationInfos.size());
dataLength = 3 * blockSize + (long) (0.5 * chunkSize);
Assert.assertEquals(5, keyLocationInfos.size());
dataLength = 3 * blockSize + (long) (1.5 * chunkSize);
Assert.assertEquals(dataLength, keyInfo.getDataSize());
validateData(keyName, dataString.concat(dataString2).getBytes());
}
@ -355,12 +361,22 @@ private void waitForContainerClose(String keyName,
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
List<Long> containerIdList = new ArrayList<>();
List<Pipeline> pipelineList = new ArrayList<>();
for (OmKeyLocationInfo info : locationInfoList) {
containerIdList.add(info.getContainerID());
}
Assert.assertTrue(!containerIdList.isEmpty());
waitForContainerClose(type, containerIdList.toArray(new Long[0]));
}
private void waitForContainerClose(HddsProtos.ReplicationType type,
Long... containerIdList)
throws ContainerNotFoundException, PipelineNotFoundException,
TimeoutException, InterruptedException {
List<Pipeline> pipelineList = new ArrayList<>();
for (long containerID : containerIdList) {
cluster.getStorageContainerManager().getEventQueue()
.fireEvent(SCMEvents.CLOSE_CONTAINER,
ContainerID.valueof(containerID));
Pipeline pipeline =
cluster.getStorageContainerManager().getContainerManager()
.getContainerWithPipeline(ContainerID.valueof(containerID))
@ -380,18 +396,28 @@ private void waitForContainerClose(String keyName,
for (long containerID : containerIdList) {
Pipeline pipeline = pipelineList.get(index);
List<DatanodeDetails> datanodes = pipeline.getNodes();
for (DatanodeDetails datanodeDetails : datanodes) {
GenericTestUtils.waitFor(() -> ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails), 500,
15 * 1000);
//double check if it's really closed (waitFor also throws an exception)
Assert.assertTrue(ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails));
// Below condition avoids the case where container has been allocated
// but not yet been used by the client. In such a case container is never
// created.
if (datanodes.stream().anyMatch(dn -> ContainerTestHelper
.isContainerPresent(cluster, containerID, dn))) {
for (DatanodeDetails datanodeDetails : datanodes) {
GenericTestUtils.waitFor(() -> ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails),
500, 15 * 1000);
//double check if it's really closed
// (waitFor also throws an exception)
Assert.assertTrue(ContainerTestHelper
.isContainerClosed(cluster, containerID, datanodeDetails));
}
}
index++;
}
}
@Ignore // test needs to be fixed after close container is handled for
// non-existent containers on datanode. Test closes pre allocated containers
// on the datanode.
@Test
public void testDiscardPreallocatedBlocks() throws Exception {
String keyName = "discardpreallocatedblocks";

View File

@ -118,6 +118,9 @@ public void testContainerStateMachineFailures() throws Exception {
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE);
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
//get the name of a valid container
@ -139,7 +142,8 @@ public void testContainerStateMachineFailures() throws Exception {
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
.getContainerPath()));
try {
// flush will throw an exception
// flush will throw an exception for the second write as the container
// dir has been deleted.
key.flush();
Assert.fail("Expected exception not thrown");
} catch (IOException ioe) {

View File

@ -616,4 +616,20 @@ public static boolean isContainerClosed(MiniOzoneCluster cluster,
}
return false;
}
public static boolean isContainerPresent(MiniOzoneCluster cluster,
long containerID, DatanodeDetails datanode) {
for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
if (datanode.equals(datanodeService.getDatanodeDetails())) {
Container container =
datanodeService.getDatanodeStateMachine().getContainer()
.getContainerSet().getContainer(containerID);
if (container != null) {
return true;
}
}
}
return false;
}
}

View File

@ -171,7 +171,6 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
}
OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(scmBlockSize)
.setOffset(0)
.build();
@ -235,7 +234,6 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
}
OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setLength(allocateSize)
.setOffset(0)
.build();

View File

@ -122,8 +122,7 @@ public AllocatedBlock allocateBlock(long size,
AllocatedBlock.Builder abb =
new AllocatedBlock.Builder()
.setContainerBlockID(new ContainerBlockID(containerID, localID))
.setPipeline(pipeline)
.setShouldCreateContainer(false);
.setPipeline(pipeline);
return abb.build();
}