HDFS-13013. Fix closeContainer API with the right container state change. Contributed by Xiaoyu Yao.

This commit is contained in:
Mukul Kumar Singh 2018-01-18 15:14:58 +05:30 committed by Owen O'Malley
parent c4b88454a7
commit 28b87af51b
9 changed files with 104 additions and 146 deletions

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -168,10 +168,10 @@ private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo)
if (subKeyInfo.getShouldCreateContainer()) {
try {
ContainerProtocolCalls.createContainer(xceiverClient, requestID);
scmClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
containerName,
NotifyObjectCreationStageRequestProto.Stage.complete);
scmClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerName, ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist, this should never happen

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
@ -122,15 +122,17 @@ public Pipeline createContainer(String containerId, String owner)
public void createContainer(String containerId, XceiverClientSpi client,
Pipeline pipeline) throws IOException {
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.begin);
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.container,
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
NotifyObjectCreationStageRequestProto.Stage.complete);
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
// Let us log this info after we let SCM know that we have completed the
// creation state.
@ -162,18 +164,20 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.pipeline,
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
NotifyObjectCreationStageRequestProto.Stage.begin);
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
client.createPipeline(pipeline.getPipelineName(),
pipeline.getMachines());
storageContainerLocationClient.notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type.pipeline,
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
NotifyObjectCreationStageRequestProto.Stage.complete);
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
@ -358,10 +362,22 @@ public void closeContainer(Pipeline pipeline) throws IOException {
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
String containerId = pipeline.getContainerName();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.closeContainer(client, traceID);
// Notify SCM to close the container
String containerId = pipeline.getContainerName();
storageContainerLocationClient.closeContainer(containerId);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.complete);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);

View File

@ -21,7 +21,7 @@
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.scm.ScmInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -94,11 +94,13 @@ OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
* Container will be in Operational state after that.
* @param type object type
* @param name object name
* @param op operation type (e.g., create, close, delete)
* @param stage creation stage
*/
void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type, String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException;
void notifyObjectStageChange(
ObjectStageChangeRequestProto.Type type, String name,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException;
/**
* Creates a replication pipeline of a specified type.
@ -111,14 +113,6 @@ Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
throws IOException;
/**
* Clsoe a container.
*
* @param containerName the name of the container to close.
* @throws IOException
*/
void closeContainer(String containerName) throws IOException;
/**
* Returns information about SCM.
*

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.scm.ScmInfo;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
@ -38,7 +37,7 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -216,23 +215,25 @@ public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
* Notify from client that creates object on datanodes.
* @param type object type
* @param name object name
* @param op operation type (e.g., create, close, delete)
* @param stage object creation stage : begin/complete
*/
@Override
public void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type,
String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
public void notifyObjectStageChange(
ObjectStageChangeRequestProto.Type type, String name,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException {
Preconditions.checkState(!Strings.isNullOrEmpty(name),
"Object name cannot be null or empty");
NotifyObjectCreationStageRequestProto request =
NotifyObjectCreationStageRequestProto.newBuilder()
ObjectStageChangeRequestProto request =
ObjectStageChangeRequestProto.newBuilder()
.setType(type)
.setName(name)
.setOp(op)
.setStage(stage)
.build();
try {
rpcProxy.notifyObjectCreationStage(NULL_RPC_CONTROLLER, request);
rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request);
} catch(ServiceException e){
throw ProtobufHelper.getRemoteException(e);
}
@ -274,21 +275,6 @@ public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType
}
}
@Override
public void closeContainer(String containerName) throws IOException {
Preconditions.checkState(!Strings.isNullOrEmpty(containerName),
"Container name cannot be null or empty");
CloseContainerRequestProto request = CloseContainerRequestProto
.newBuilder()
.setContainerName(containerName)
.build();
try {
rpcProxy.closeContainer(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public ScmInfo getScmInfo() throws IOException {
OzoneProtos.GetScmInfoRequestProto request =

View File

@ -65,14 +65,6 @@ message GetContainerResponseProto {
required hadoop.hdfs.ozone.Pipeline pipeline = 1;
}
message CloseContainerRequestProto {
required string containerName = 1;
}
message CloseContainerResponseProto {
}
message ListContainerRequestProto {
required uint32 count = 1;
optional string startName = 2;
@ -91,21 +83,27 @@ message DeleteContainerResponseProto {
// Empty response
}
message NotifyObjectCreationStageRequestProto {
message ObjectStageChangeRequestProto {
enum Type {
container = 1;
pipeline = 2;
}
// delete/copy operation may be added later
enum Op {
create = 1;
close = 2;
}
enum Stage {
begin = 1;
complete = 2;
}
required string name = 1;
required Type type = 2;
required Stage stage = 3;
required Op op= 3;
required Stage stage = 4;
}
message NotifyObjectCreationStageResponseProto {
message ObjectStageChangeResponseProto {
// Empty response
}
@ -188,14 +186,10 @@ service StorageContainerLocationProtocolService {
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
/**
* Notify from client when begin or finish creating container or pipeline on datanodes.
* Notify from client when begin or finish container or pipeline operations on datanodes.
*/
rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto);
rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns (ObjectStageChangeResponseProto);
/**
* Close a container.
*/
rpc closeContainer(CloseContainerRequestProto) returns (CloseContainerResponseProto);
/*
* Apis that Manage Pipelines.
*

View File

@ -33,8 +33,6 @@
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
@ -43,8 +41,8 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -166,30 +164,18 @@ public DeleteContainerResponseProto deleteContainer(
}
@Override
public NotifyObjectCreationStageResponseProto notifyObjectCreationStage(
RpcController controller, NotifyObjectCreationStageRequestProto request)
public ObjectStageChangeResponseProto notifyObjectStageChange(
RpcController controller, ObjectStageChangeRequestProto request)
throws ServiceException {
try {
impl.notifyObjectCreationStage(request.getType(), request.getName(),
request.getStage());
return NotifyObjectCreationStageResponseProto.newBuilder().build();
impl.notifyObjectStageChange(request.getType(), request.getName(),
request.getOp(), request.getStage());
return ObjectStageChangeResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public CloseContainerResponseProto closeContainer(
RpcController controller, CloseContainerRequestProto request)
throws ServiceException {
try {
impl.closeContainer(request.getContainerName());
return CloseContainerResponseProto.newBuilder().build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)

View File

@ -68,7 +68,7 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@ -671,32 +671,43 @@ public OzoneProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses,
}
/**
* Notify from client when begin/finish creating container/pipeline objects
* on datanodes.
* Notify from client when begin/finish operation for container/pipeline
* objects on datanodes.
* @param type
* @param name
* @param op
* @param stage
*/
@Override
public void notifyObjectCreationStage(
NotifyObjectCreationStageRequestProto.Type type, String name,
NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
public void notifyObjectStageChange(
ObjectStageChangeRequestProto.Type type, String name,
ObjectStageChangeRequestProto.Op op,
ObjectStageChangeRequestProto.Stage stage) throws IOException {
if (type == NotifyObjectCreationStageRequestProto.Type.container) {
ContainerInfo info = scmContainerManager.getContainer(name);
LOG.info("Container {} current state {} new stage {}", name,
info.getState(), stage);
if (stage == NotifyObjectCreationStageRequestProto.Stage.begin) {
LOG.info("Object type {} name {} op {} new stage {}",
type, name, op, stage);
if (type == ObjectStageChangeRequestProto.Type.container) {
if (op == ObjectStageChangeRequestProto.Op.create) {
if (stage == ObjectStageChangeRequestProto.Stage.begin) {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.CREATE);
} else {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.CREATED);
}
} else if (type == NotifyObjectCreationStageRequestProto.Type.pipeline) {
// TODO: pipeline state update will be addressed in future patch.
} else if (op == ObjectStageChangeRequestProto.Op.close) {
if (stage == ObjectStageChangeRequestProto.Stage.begin) {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.FINALIZE);
} else {
scmContainerManager.updateContainerState(name,
OzoneProtos.LifeCycleEvent.CLOSE);
}
}
} //else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
// TODO: pipeline state update will be addressed in future patch.
//}
}
/**
* Creates a replication pipeline of a specified type.
@ -711,12 +722,6 @@ public Pipeline createReplicationPipeline(
return null;
}
@Override
public void closeContainer(String containerName) throws IOException {
checkAdminAccess();
scmContainerManager.closeContainer(containerName);
}
/**
* Queries a list of Node that match a set of statuses.
* <p>

View File

@ -255,25 +255,6 @@ public void deleteContainer(String containerName) throws IOException {
}
}
@Override
public void closeContainer(String containerName) throws IOException {
lock.lock();
try {
OzoneProtos.LifeCycleState newState =
updateContainerState(containerName, OzoneProtos.LifeCycleEvent.CLOSE);
if (newState != OzoneProtos.LifeCycleState.CLOSED) {
throw new SCMException(
"Failed to close container "
+ containerName
+ ", reason : container in state "
+ newState,
SCMException.ResultCodes.UNEXPECTED_CONTAINER_STATE);
}
} finally {
lock.unlock();
}
}
/** {@inheritDoc} Used by client to update container state on SCM. */
@Override
public OzoneProtos.LifeCycleState updateContainerState(
@ -313,6 +294,10 @@ public OzoneProtos.LifeCycleState updateContainerState(
containerLeaseManager.release(containerInfo);
break;
case FINALIZE:
// TODO: we don't need a lease manager here for closing as the
// container report will include the container state after HDFS-13008
// If a client failed to update the container close state, DN container
// report from 3 DNs will be used to close the container eventually.
break;
case CLOSE:
break;

View File

@ -83,14 +83,6 @@ ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
*/
void deleteContainer(String containerName) throws IOException;
/**
* Close a container.
*
* @param containerName - name of the container to close.
* @throws IOException
*/
void closeContainer(String containerName) throws IOException;
/**
* Update container state.
* @param containerName - Container Name