HDFS-11676. Ozone: SCM CLI: Implement close container command. Contributed by Chen Liang.

This commit is contained in:
Weiwei Yang 2017-09-12 11:12:08 +08:00 committed by Owen O'Malley
parent 0c17776646
commit 1087ce4cd2
16 changed files with 295 additions and 12 deletions

View File

@ -255,6 +255,49 @@ public Pipeline getContainer(String containerId) throws
return storageContainerLocationClient.getContainer(containerId);
}
/**
* Close a container.
*
* @param pipeline the container to be closed.
* @throws IOException
*/
@Override
public void closeContainer(Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
LOG.debug("Close container {}", pipeline);
/*
TODO: two orders here, revisit this later:
1. close on SCM first, then on data node
2. close on data node first, then on SCM
with 1: if client failed after closing on SCM, then there is a
container SCM thinks as closed, but is actually open. Then SCM will no
longer allocate block to it, which is fine. But SCM may later try to
replicate this "closed" container, which I'm not sure is safe.
with 2: if client failed after close on datanode, then there is a
container SCM thinks as open, but is actually closed. Then SCM will still
try to allocate block to it. Which will fail when actually doing the
write. No more data can be written, but at least the correctness and
consistency of existing data will maintain.
For now, take the #2 way.
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.closeContainer(client, traceID);
// Notify SCM to close the container
String containerId = pipeline.getContainerName();
storageContainerLocationClient.closeContainer(containerId);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Get the the current usage information.
* @param pipeline - Pipeline

View File

@ -54,6 +54,14 @@ public interface ScmClient {
*/
Pipeline getContainer(String containerId) throws IOException;
/**
* Close a container by name.
*
* @param pipeline the container to be closed.
* @throws IOException
*/
void closeContainer(Pipeline pipeline) throws IOException;
/**
* Deletes an existing container.
* @param pipeline - Pipeline that represents the container.

View File

@ -108,4 +108,12 @@ void notifyObjectCreationStage(
Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
throws IOException;
/**
* Clsoe a container.
*
* @param containerName the name of the container to close.
* @throws IOException
*/
void closeContainer(String containerName) throws IOException;
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
@ -269,6 +270,21 @@ public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType
}
}
@Override
public void closeContainer(String containerName) throws IOException {
Preconditions.checkState(!Strings.isNullOrEmpty(containerName),
"Container name cannot be null or empty");
CloseContainerRequestProto request = CloseContainerRequestProto
.newBuilder()
.setContainerName(containerName)
.build();
try {
rpcProxy.closeContainer(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;

View File

@ -279,6 +279,29 @@ public static void deleteContainer(XceiverClientSpi client,
validateContainerResponse(response);
}
/**
* Close a container.
*
* @param client
* @param traceID
* @throws IOException
*/
public static void closeContainer(XceiverClientSpi client, String traceID)
throws IOException {
ContainerProtos.CloseContainerRequestProto.Builder closeRequest =
ContainerProtos.CloseContainerRequestProto.newBuilder();
closeRequest.setPipeline(client.getPipeline().getProtobufMessage());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(Type.CloseContainer);
request.setCloseContainer(closeRequest);
request.setTraceID(traceID);
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
}
/**
* readContainer call that gets meta data from an existing container.
*

View File

@ -64,6 +64,14 @@ message GetContainerResponseProto {
required hadoop.hdfs.ozone.Pipeline pipeline = 1;
}
message CloseContainerRequestProto {
required string containerName = 1;
}
message CloseContainerResponseProto {
}
message ListContainerRequestProto {
required uint32 count = 1;
optional string startName = 2;
@ -183,6 +191,10 @@ service StorageContainerLocationProtocolService {
*/
rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto);
/**
* Close a container.
*/
rpc closeContainer(CloseContainerRequestProto) returns (CloseContainerResponseProto);
/*
* Apis that Manage Pipelines.
*

View File

@ -31,6 +31,8 @@
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.CloseContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
@ -173,6 +175,18 @@ public NotifyObjectCreationStageResponseProto notifyObjectCreationStage(
}
}
@Override
public CloseContainerResponseProto closeContainer(
RpcController controller, CloseContainerRequestProto request)
throws ServiceException {
try {
impl.closeContainer(request.getContainerName());
return CloseContainerResponseProto.newBuilder().build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)

View File

@ -398,6 +398,11 @@ public Pipeline getContainer(String containerName) throws IOException {
return scmContainerManager.getContainer(containerName).getPipeline();
}
@VisibleForTesting
ContainerInfo getContainerInfo(String containerName) throws IOException {
return scmContainerManager.getContainer(containerName);
}
/**
* {@inheritDoc}
*/
@ -487,6 +492,12 @@ public Pipeline createReplicationPipeline(
return null;
}
@Override
public void closeContainer(String containerName) throws IOException {
checkAdminAccess();
scmContainerManager.closeContainer(containerName);
}
/**
* Queries a list of Node that match a set of statuses.
* <p>

View File

@ -65,7 +65,7 @@
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
FAILED_TO_FIND_CONTAINER_WITH_SPACE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_BLOCK;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
@ -384,7 +384,7 @@ public AllocatedBlock allocateBlock(final long size) throws IOException {
// now we should have some candidates in ALLOCATE state
if (candidates.size() == 0) {
throw new SCMException("Fail to find any container to allocate block "
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SPACE);
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.cli.container;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.ozone.scm.cli.OzoneCommandHandler;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
import static org.apache.hadoop.ozone.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.ozone.scm.cli.SCMCLI.HELP_OP;
/**
* The handler of close container command.
*/
public class CloseContainerHandler extends OzoneCommandHandler {
public static final String CONTAINER_CLOSE = "close";
public static final String OPT_CONTAINER_NAME = "c";
@Override
public void execute(CommandLine cmd) throws IOException {
if (!cmd.hasOption(CONTAINER_CLOSE)) {
throw new IOException("Expecting container close");
}
if (!cmd.hasOption(OPT_CONTAINER_NAME)) {
displayHelp();
if (!cmd.hasOption(HELP_OP)) {
throw new IOException("Expecting container name");
} else {
return;
}
}
String containerName = cmd.getOptionValue(OPT_CONTAINER_NAME);
Pipeline pipeline = getScmClient().getContainer(containerName);
if (pipeline == null) {
throw new IOException("Cannot close an non-exist container "
+ containerName);
}
logOut("Closing container : %s.", containerName);
getScmClient().closeContainer(pipeline);
logOut("Container closed.");
}
@Override
public void displayHelp() {
Options options = new Options();
addOptions(options);
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp(CMD_WIDTH, "hdfs scm -container -close <option>",
"where <option> is", options, "");
}
public static void addOptions(Options options) {
Option containerNameOpt = new Option(OPT_CONTAINER_NAME,
true, "Specify container name");
options.addOption(containerNameOpt);
}
CloseContainerHandler(ScmClient client) {
super(client);
}
}

View File

@ -29,6 +29,7 @@
import static org.apache.hadoop.ozone.scm.cli.SCMCLI.CMD_WIDTH;
import static org.apache.hadoop.ozone.scm.cli.SCMCLI.HELP_OP;
import static org.apache.hadoop.ozone.scm.cli.container.CloseContainerHandler.CONTAINER_CLOSE;
import static org.apache.hadoop.ozone.scm.cli.container
.CreateContainerHandler.CONTAINER_CREATE;
import static org.apache.hadoop.ozone.scm.cli.container
@ -65,6 +66,8 @@ public void execute(CommandLine cmd) throws IOException {
handler = new InfoContainerHandler(getScmClient());
} else if (cmd.hasOption(CONTAINER_LIST)) {
handler = new ListContainerHandler(getScmClient());
} else if (cmd.hasOption(CONTAINER_CLOSE)) {
handler = new CloseContainerHandler(getScmClient());
}
// execute the sub command, throw exception if no sub command found
@ -101,12 +104,15 @@ private static void addCommandsOption(Options options) {
new Option(CONTAINER_DELETE, false, "Delete container");
Option listContainer =
new Option(CONTAINER_LIST, false, "List container");
Option closeContainer =
new Option(CONTAINER_CLOSE, false, "Close container");
options.addOption(createContainer);
options.addOption(deleteContainer);
options.addOption(infoContainer);
options.addOption(listContainer);
// TODO : add other options such as delete, close etc.
options.addOption(closeContainer);
// Every new option should add it's option here.
}
public static void addOptions(Options options) {
@ -116,6 +122,7 @@ public static void addOptions(Options options) {
DeleteContainerHandler.addOptions(options);
InfoContainerHandler.addOptions(options);
ListContainerHandler.addOptions(options);
// TODO : add other options such as delete, close etc.
CloseContainerHandler.addOptions(options);
// Every new option should add it's option here.
}
}

View File

@ -278,6 +278,22 @@ public void deleteContainer(String containerName) throws IOException {
}
}
@Override
public void closeContainer(String containerName) throws IOException {
lock.lock();
try {
OzoneProtos.LifeCycleState newState =
updateContainerState(containerName, OzoneProtos.LifeCycleEvent.CLOSE);
if (newState != OzoneProtos.LifeCycleState.CLOSED) {
throw new SCMException("Failed to close container " + containerName +
", reason : container in state " + newState,
SCMException.ResultCodes.UNEXPECTED_CONTAINER_STATE);
}
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
* Used by client to update container state on SCM.

View File

@ -78,6 +78,14 @@ ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
*/
void deleteContainer(String containerName) throws IOException;
/**
* Close a container.
*
* @param containerName - name of the container to close.
* @throws IOException
*/
void closeContainer(String containerName) throws IOException;
/**
* Update container state.
* @param containerName - Container Name

View File

@ -109,9 +109,10 @@ public enum ResultCodes {
FAILED_TO_CHANGE_CONTAINER_STATE,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
FAILED_TO_FIND_CONTAINER_WITH_SPACE,
BLOCK_EXISTS,
FAILED_TO_FIND_BLOCK,
IO_EXCEPTION
IO_EXCEPTION,
UNEXPECTED_CONTAINER_STATE
}
}

View File

@ -110,6 +110,12 @@ public Pipeline getContainer(String containerId)
return ContainerLookUpService.lookUp(containerId).getPipeline();
}
@Override
public void closeContainer(Pipeline container) throws IOException {
// Do nothing, because the mock container does not have the notion of
// "open" and "close".
}
@Override
public long getContainerSize(Pipeline pipeline) throws IOException {
// just return a constant value for now

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.client.ContainerOperationClient;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.util.StringUtils;
@ -48,6 +49,9 @@
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
import static org.apache.hadoop.ozone.scm.cli.ResultCode.EXECUTION_ERROR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -174,7 +178,7 @@ public void testDeleteContainer() throws Exception {
testErr = new ByteArrayOutputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
assertEquals(ResultCode.EXECUTION_ERROR, exitCode);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(testErr.toString()
.contains("Deleting an open container is not allowed."));
Assert.assertTrue(containerExist(containerName));
@ -185,7 +189,7 @@ public void testDeleteContainer() throws Exception {
// Gracefully delete a container should fail because it is not empty.
testErr = new ByteArrayOutputStream();
int exitCode2 = runCommandAndGetOutput(delCmd, out, testErr);
assertEquals(ResultCode.EXECUTION_ERROR, exitCode2);
assertEquals(EXECUTION_ERROR, exitCode2);
assertTrue(testErr.toString()
.contains("Container cannot be deleted because it is not empty."));
Assert.assertTrue(containerExist(containerName));
@ -228,7 +232,7 @@ public void testDeleteContainer() throws Exception {
delCmd = new String[] {"-container", "-delete", "-c", containerName};
testErr = new ByteArrayOutputStream();
exitCode = runCommandAndGetOutput(delCmd, out, testErr);
assertEquals(ResultCode.EXECUTION_ERROR, exitCode);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(testErr.toString()
.contains("Specified key does not exist."));
}
@ -261,7 +265,7 @@ public void testInfoContainer() throws Exception {
String[] info = {"-container", "-info", cname};
int exitCode = runCommandAndGetOutput(info, null, null);
assertEquals("Expected Execution Error, Did not find that.",
ResultCode.EXECUTION_ERROR, exitCode);
EXECUTION_ERROR, exitCode);
// Create an empty container.
cname = "ContainerTestInfo1";
@ -384,7 +388,7 @@ public void testListContainerCommand() throws Exception {
// Test without -start, -prefix and -count
String[] args = new String[] {"-container", "-list"};
int exitCode = runCommandAndGetOutput(args, out, err);
assertEquals(ResultCode.EXECUTION_ERROR, exitCode);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(err.toString()
.contains("Expecting container count"));
@ -395,7 +399,7 @@ public void testListContainerCommand() throws Exception {
args = new String[] {"-container", "-list",
"-start", prefix + 0, "-count", "-1"};
exitCode = runCommandAndGetOutput(args, out, err);
assertEquals(ResultCode.EXECUTION_ERROR, exitCode);
assertEquals(EXECUTION_ERROR, exitCode);
assertTrue(err.toString()
.contains("-count should not be negative"));
@ -468,6 +472,28 @@ public void testListContainerCommand() throws Exception {
assertTrue(out.toString().isEmpty());
}
@Test
public void testCloseContainer() throws Exception {
String containerName = "containerTestClose";
String[] args = {"-container", "-create", "-c", containerName};
assertEquals(ResultCode.SUCCESS, cli.run(args));
Pipeline container = scm.getContainer(containerName);
assertNotNull(container);
assertEquals(containerName, container.getContainerName());
ContainerInfo containerInfo = scm.getContainerInfo(containerName);
assertEquals(OPEN, containerInfo.getState());
String[] args1 = {"-container", "-close", "-c", containerName};
assertEquals(ResultCode.SUCCESS, cli.run(args1));
containerInfo = scm.getContainerInfo(containerName);
assertEquals(CLOSED, containerInfo.getState());
// closing this container again will trigger an error.
assertEquals(EXECUTION_ERROR, cli.run(args1));
}
@Test
public void testHelp() throws Exception {
// TODO : this test assertion may break for every new help entry added