diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f032b4fb2f..252b7d5406 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -54,6 +54,9 @@ Release 2.7.0 - UNRELEASED YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) + YARN-2189. [YARN-1492] Admin service for cache manager. + (Chris Trezzo via kasha) + YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe via jianhe) diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index b98f344c85..dfa27e4166 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -36,6 +36,7 @@ function hadoop_usage echo " resourcemanager -format-state-store deletes the RMStateStore" echo " rmadmin admin tools" echo " sharedcachemanager run the SharedCacheManager daemon" + echo " scmadmin SharedCacheManager admin tools" echo " timelineserver run the timeline server" echo " version print the version" echo " or" @@ -162,6 +163,10 @@ case "${COMMAND}" in CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager' YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS" ;; + scmadmin) + CLASS='org.apache.hadoop.yarn.client.SCMAdmin' + YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" + ;; version) CLASS=org.apache.hadoop.util.VersionInfo hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index 5e2278d8a0..a763d395cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -97,6 +97,7 @@ application_history_client.proto server/application_history_server.proto client_SCM_protocol.proto + server/SCM_Admin_protocol.proto ${project.build.directory}/generated-sources/java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 41f85ef9e4..f0f88d8d69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1401,6 +1401,18 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_SCM_APP_CHECKER_CLASS = "org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker"; + /** The address of the SCM admin interface. */ + public static final String SCM_ADMIN_ADDRESS = + SHARED_CACHE_PREFIX + "admin.address"; + public static final int DEFAULT_SCM_ADMIN_PORT = 8047; + public static final String DEFAULT_SCM_ADMIN_ADDRESS = + "0.0.0.0:" + DEFAULT_SCM_ADMIN_PORT; + + /** Number of threads used to handle SCM admin interface. */ + public static final String SCM_ADMIN_CLIENT_THREAD_COUNT = + SHARED_CACHE_PREFIX + "admin.thread-count"; + public static final int DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT = 1; + // In-memory SCM store configuration public static final String IN_MEMORY_STORE_PREFIX = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocol.java new file mode 100644 index 0000000000..5c791fab4e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocol.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + *

+ * The protocol between administrators and the SharedCacheManager + *

+ */ +@Public +@Unstable +public interface SCMAdminProtocol { + /** + *

+ * The method used by administrators to ask SCM to run cleaner task right away + *

+ * + * @param request request SharedCacheManager to run a cleaner task + * @return SharedCacheManager returns an empty response + * on success and throws an exception on rejecting the request + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + public RunSharedCacheCleanerTaskResponse runCleanerTask( + RunSharedCacheCleanerTaskRequest request) throws YarnException, IOException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocolPB.java new file mode 100644 index 0000000000..93a2c67fb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/SCMAdminProtocolPB.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.SCMAdminProtocol.SCMAdminProtocolService; + +@Private +@Unstable +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB", + protocolVersion = 1) +public interface SCMAdminProtocolPB extends + SCMAdminProtocolService.BlockingInterface { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskRequest.java new file mode 100644 index 0000000000..5b4b110566 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskRequest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The request from admin to ask the SharedCacheManager to run + * cleaner service right away. + *

+ * + *

+ * Currently, this is empty. + *

+ */ +@Public +@Unstable +public abstract class RunSharedCacheCleanerTaskRequest { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskResponse.java new file mode 100644 index 0000000000..1d32d32a68 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RunSharedCacheCleanerTaskResponse.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response to admin from the SharedCacheManager when + * is asked to run the cleaner service. + *

+ * + *

+ * Currently, this is empty. + *

+ */ +@Public +@Unstable +public abstract class RunSharedCacheCleanerTaskResponse { + + /** + * Get whether or not the shared cache manager has accepted the request. + * Shared cache manager will reject the request if there is an ongoing task + * + * @return boolean True if the request has been accepted, false otherwise. + */ + @Public + @Unstable + public abstract boolean getAccepted(); + + /** + * Set whether or not the shared cache manager has accepted the request Shared + * cache manager will reject the request if there is an ongoing task + * + * @param b True if the request has been accepted, false otherwise. + */ + @Public + @Unstable + public abstract void setAccepted(boolean b); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/SCM_Admin_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/SCM_Admin_protocol.proto new file mode 100644 index 0000000000..4e46c57427 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/SCM_Admin_protocol.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "SCMAdminProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; + +service SCMAdminProtocolService { + rpc runCleanerTask (RunSharedCacheCleanerTaskRequestProto) returns (RunSharedCacheCleanerTaskResponseProto); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 10f5b9fe4c..94e73e1284 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -354,3 +354,14 @@ message ReservationDeleteRequestProto { message ReservationDeleteResponseProto { } + +////////////////////////////////////////////////////// +/////// SCM_Admin_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message RunSharedCacheCleanerTaskRequestProto { +} + +message RunSharedCacheCleanerTaskResponseProto { + optional bool accepted = 1; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/SCMAdmin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/SCMAdmin.java new file mode 100644 index 0000000000..1e45c5ad1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/SCMAdmin.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; + +public class SCMAdmin extends Configured implements Tool { + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public SCMAdmin() { + super(); + } + + public SCMAdmin(Configuration conf) { + super(conf); + } + + private static void printHelp(String cmd) { + String summary = "scmadmin is the command to execute shared cache manager" + + "administrative commands.\n" + + "The full syntax is: \n\n" + + "hadoop scmadmin" + + " [-runCleanerTask]" + + " [-help [cmd]]\n"; + + String runCleanerTask = + "-runCleanerTask: Run cleaner task right away.\n"; + + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + + "\t\tis specified.\n"; + + if ("runCleanerTask".equals(cmd)) { + System.out.println(runCleanerTask); + } else if ("help".equals(cmd)) { + System.out.println(help); + } else { + System.out.println(summary); + System.out.println(runCleanerTask); + System.out.println(help); + System.out.println(); + ToolRunner.printGenericCommandUsage(System.out); + } + } + + /** + * Displays format of commands. + * @param cmd The command that is being executed. + */ + private static void printUsage(String cmd) { + if ("-runCleanerTask".equals(cmd)) { + System.err.println("Usage: yarn scmadmin" + " [-runCleanerTask]"); + } else { + System.err.println("Usage: yarn scmadmin"); + System.err.println(" [-runCleanerTask]"); + System.err.println(" [-help [cmd]]"); + System.err.println(); + ToolRunner.printGenericCommandUsage(System.err); + } + } + + protected SCMAdminProtocol createSCMAdminProtocol() throws IOException { + // Get the current configuration + final YarnConfiguration conf = new YarnConfiguration(getConf()); + + // Create the admin client + final InetSocketAddress addr = conf.getSocketAddr( + YarnConfiguration.SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_PORT); + final YarnRPC rpc = YarnRPC.create(conf); + SCMAdminProtocol scmAdminProtocol = + (SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, addr, conf); + return scmAdminProtocol; + } + + private int runCleanerTask() throws YarnException, IOException { + // run cleaner task right away + SCMAdminProtocol scmAdminProtocol = createSCMAdminProtocol(); + RunSharedCacheCleanerTaskRequest request = + recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class); + RunSharedCacheCleanerTaskResponse response = + scmAdminProtocol.runCleanerTask(request); + if (response.getAccepted()) { + System.out.println("request accepted by shared cache manager"); + return 0; + } else { + System.out.println("request rejected by shared cache manager"); + return 1; + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1) { + printUsage(""); + return -1; + } + + int i = 0; + String cmd = args[i++]; + + try { + if ("-runCleanerTask".equals(cmd)) { + if (args.length != 1) { + printUsage(cmd); + return -1; + } else { + return runCleanerTask(); + } + } else if ("-help".equals(cmd)) { + if (i < args.length) { + printUsage(args[i]); + } else { + printHelp(""); + } + return 0; + } else { + System.err.println(cmd.substring(1) + ": Unknown command"); + printUsage(""); + return -1; + } + + } catch (IllegalArgumentException arge) { + System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage()); + printUsage(cmd); + } catch (RemoteException e) { + // + // This is a error returned by hadoop server. Print + // out the first line of the error message, ignore the stack trace. + try { + String[] content; + content = e.getLocalizedMessage().split("\n"); + System.err.println(cmd.substring(1) + ": " + + content[0]); + } catch (Exception ex) { + System.err.println(cmd.substring(1) + ": " + + ex.getLocalizedMessage()); + } + } catch (Exception e) { + System.err.println(cmd.substring(1) + ": " + + e.getLocalizedMessage()); + } + return -1; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(new SCMAdmin(), args); + System.exit(result); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMAdminProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMAdminProtocolPBClientImpl.java new file mode 100644 index 0000000000..4dd06d6e0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/SCMAdminProtocolPBClientImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocol; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +import com.google.protobuf.ServiceException; + +public class SCMAdminProtocolPBClientImpl implements SCMAdminProtocol, + Closeable { + + private SCMAdminProtocolPB proxy; + + public SCMAdminProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, SCMAdminProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(SCMAdminProtocolPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public RunSharedCacheCleanerTaskResponse runCleanerTask( + RunSharedCacheCleanerTaskRequest request) throws YarnException, + IOException { + YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto requestProto = + ((RunSharedCacheCleanerTaskRequestPBImpl) request).getProto(); + try { + return new RunSharedCacheCleanerTaskResponsePBImpl(proxy.runCleanerTask(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMAdminProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMAdminProtocolPBServiceImpl.java new file mode 100644 index 0000000000..7d8c5782c7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/SCMAdminProtocolPBServiceImpl.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.server.api.SCMAdminProtocol; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class SCMAdminProtocolPBServiceImpl implements SCMAdminProtocolPB { + + private SCMAdminProtocol real; + + public SCMAdminProtocolPBServiceImpl(SCMAdminProtocol impl) { + this.real = impl; + } + + @Override + public RunSharedCacheCleanerTaskResponseProto runCleanerTask(RpcController controller, + RunSharedCacheCleanerTaskRequestProto proto) throws ServiceException { + RunSharedCacheCleanerTaskRequestPBImpl request = + new RunSharedCacheCleanerTaskRequestPBImpl(proto); + try { + RunSharedCacheCleanerTaskResponse response = real.runCleanerTask(request); + return ((RunSharedCacheCleanerTaskResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskRequestPBImpl.java new file mode 100644 index 0000000000..91d1b5cdef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskRequestPBImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskRequestProto; + +public class RunSharedCacheCleanerTaskRequestPBImpl extends + RunSharedCacheCleanerTaskRequest { + RunSharedCacheCleanerTaskRequestProto proto = + RunSharedCacheCleanerTaskRequestProto.getDefaultInstance(); + RunSharedCacheCleanerTaskRequestProto.Builder builder = null; + boolean viaProto = false; + + public RunSharedCacheCleanerTaskRequestPBImpl() { + builder = RunSharedCacheCleanerTaskRequestProto.newBuilder(); + } + + public RunSharedCacheCleanerTaskRequestPBImpl( + RunSharedCacheCleanerTaskRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public RunSharedCacheCleanerTaskRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RunSharedCacheCleanerTaskRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskResponsePBImpl.java new file mode 100644 index 0000000000..02c4f319a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RunSharedCacheCleanerTaskResponsePBImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.RunSharedCacheCleanerTaskResponseProtoOrBuilder; + +public class RunSharedCacheCleanerTaskResponsePBImpl extends + RunSharedCacheCleanerTaskResponse { + RunSharedCacheCleanerTaskResponseProto proto = + RunSharedCacheCleanerTaskResponseProto.getDefaultInstance(); + RunSharedCacheCleanerTaskResponseProto.Builder builder = null; + boolean viaProto = false; + + public RunSharedCacheCleanerTaskResponsePBImpl() { + builder = RunSharedCacheCleanerTaskResponseProto.newBuilder(); + } + + public RunSharedCacheCleanerTaskResponsePBImpl( + RunSharedCacheCleanerTaskResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public boolean getAccepted() { + RunSharedCacheCleanerTaskResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasAccepted()) ? p.getAccepted() : false; + } + + @Override + public void setAccepted(boolean b) { + maybeInitBuilder(); + builder.setAccepted(b); + } + + public RunSharedCacheCleanerTaskResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RunSharedCacheCleanerTaskResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 54c4dbc9a0..73a6b5d55e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1431,6 +1431,18 @@ yarn.sharedcache.store.in-memory.check-period-mins 720 + + + The address of the admin interface in the SCM (shared cache manager) + yarn.sharedcache.admin.address + 0.0.0.0:8047 + + + + The number of threads used to handle SCM admin interface (1 by default) + yarn.sharedcache.admin.thread-count + 1 + The frequency at which a cleaner task runs. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMAdminProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMAdminProtocolService.java new file mode 100644 index 0000000000..3ecca02e73 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SCMAdminProtocolService.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +/** + * This service handles all SCMAdminProtocol rpc calls from administrators + * to the shared cache manager. + */ +@Private +@Unstable +public class SCMAdminProtocolService extends AbstractService implements + SCMAdminProtocol { + + private static final Log LOG = LogFactory.getLog(SCMAdminProtocolService.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress clientBindAddress; + private final CleanerService cleanerService; + private AccessControlList adminAcl; + + public SCMAdminProtocolService(CleanerService cleanerService) { + super(SCMAdminProtocolService.class.getName()); + this.cleanerService = cleanerService; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.clientBindAddress = getBindAddress(conf); + adminAcl = new AccessControlList(conf.get( + YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(SCMAdminProtocol.class, this, + clientBindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_ADMIN_CLIENT_THREAD_COUNT)); + + // TODO: Enable service authorization (see YARN-2774) + + this.server.start(); + clientBindAddress = + conf.updateConnectAddr(YarnConfiguration.SCM_ADMIN_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + private void checkAcls(String method) throws YarnException { + UserGroupInformation user; + try { + user = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + LOG.warn("Couldn't get current user", ioe); + throw RPCUtil.getRemoteException(ioe); + } + + if (!adminAcl.isUserAllowed(user)) { + LOG.warn("User " + user.getShortUserName() + " doesn't have permission" + + " to call '" + method + "'"); + + throw RPCUtil.getRemoteException( + new AccessControlException("User " + user.getShortUserName() + + " doesn't have permission" + " to call '" + method + "'")); + } + LOG.info("SCM Admin: " + method + " invoked by user " + + user.getShortUserName()); + } + + @Override + public RunSharedCacheCleanerTaskResponse runCleanerTask( + RunSharedCacheCleanerTaskRequest request) throws YarnException { + checkAcls("runCleanerTask"); + RunSharedCacheCleanerTaskResponse response = + recordFactory.newRecordInstance(RunSharedCacheCleanerTaskResponse.class); + this.cleanerService.runCleanerTask(); + // if we are here, then we have submitted the request to the cleaner + // service, ack the request to the admin client + response.setAccepted(true); + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index c54e470dd5..5c33b2b8f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -74,6 +74,9 @@ protected void serviceInit(Configuration conf) throws Exception { ClientProtocolService cps = createClientProtocolService(store); addService(cps); + SCMAdminProtocolService saps = createSCMAdminProtocolService(cs); + addService(saps); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -113,6 +116,11 @@ private ClientProtocolService createClientProtocolService(SCMStore store) { return new ClientProtocolService(store); } + private SCMAdminProtocolService createSCMAdminProtocolService( + CleanerService cleanerService) { + return new SCMAdminProtocolService(cleanerService); + } + @Override protected void serviceStop() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSCMAdminProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSCMAdminProtocolService.java new file mode 100644 index 0000000000..e6cf15fc3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestSCMAdminProtocolService.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.server.api.SCMAdminProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RunSharedCacheCleanerTaskResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RunSharedCacheCleanerTaskResponsePBImpl; +import org.apache.hadoop.yarn.client.SCMAdmin; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Basic unit tests for the SCM Admin Protocol Service and SCMAdmin. + */ +public class TestSCMAdminProtocolService { + + static SCMAdminProtocolService service; + static SCMAdminProtocol SCMAdminProxy; + static SCMAdminProtocol mockAdmin; + static SCMAdmin adminCLI; + static SCMStore store; + static CleanerService cleaner; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @Before + public void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_CLASS, + InMemorySCMStore.class.getName()); + + cleaner = mock(CleanerService.class); + + service = spy(new SCMAdminProtocolService(cleaner)); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADMIN_PORT); + + SCMAdminProxy = + (SCMAdminProtocol) rpc.getProxy(SCMAdminProtocol.class, scmAddress, + conf); + + mockAdmin = mock(SCMAdminProtocol.class); + adminCLI = new SCMAdmin(new Configuration()) { + @Override + protected SCMAdminProtocol createSCMAdminProtocol() throws IOException { + return mockAdmin; + } + }; + } + + @After + public void cleanUpTest() { + if (service != null) { + service.stop(); + } + + if (SCMAdminProxy != null) { + RPC.stopProxy(SCMAdminProxy); + } + } + + @Test + public void testRunCleanerTask() throws Exception { + doNothing().when(cleaner).runCleanerTask(); + RunSharedCacheCleanerTaskRequest request = + recordFactory.newRecordInstance(RunSharedCacheCleanerTaskRequest.class); + RunSharedCacheCleanerTaskResponse response = SCMAdminProxy.runCleanerTask(request); + Assert.assertTrue("cleaner task request isn't accepted", response.getAccepted()); + verify(service, times(1)).runCleanerTask(any(RunSharedCacheCleanerTaskRequest.class)); + } + + @Test + public void testRunCleanerTaskCLI() throws Exception { + String[] args = { "-runCleanerTask" }; + RunSharedCacheCleanerTaskResponse rp = + new RunSharedCacheCleanerTaskResponsePBImpl(); + rp.setAccepted(true); + when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class))) + .thenReturn(rp); + assertEquals(0, adminCLI.run(args)); + rp.setAccepted(false); + when(mockAdmin.runCleanerTask(isA(RunSharedCacheCleanerTaskRequest.class))) + .thenReturn(rp); + assertEquals(1, adminCLI.run(args)); + verify(mockAdmin, times(2)).runCleanerTask( + any(RunSharedCacheCleanerTaskRequest.class)); + } +}