HDFS-16210. RBF: Add the option of refreshCallQueue to RouterAdmin (#3379)

This commit is contained in:
Symious 2021-09-09 09:57:27 +08:00 committed by GitHub
parent a186460004
commit c0890e6d04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 1 deletions

View File

@ -81,11 +81,15 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos; import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
@ -102,7 +106,7 @@
* router. It is created, started, and stopped by {@link Router}. * router. It is created, started, and stopped by {@link Router}.
*/ */
public class RouterAdminServer extends AbstractService public class RouterAdminServer extends AbstractService
implements RouterAdminProtocol { implements RouterAdminProtocol, RefreshCallQueueProtocol {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RouterAdminServer.class); LoggerFactory.getLogger(RouterAdminServer.class);
@ -197,8 +201,16 @@ public RouterAdminServer(Configuration conf, Router router)
GenericRefreshProtocolProtos.GenericRefreshProtocolService. GenericRefreshProtocolProtos.GenericRefreshProtocolService.
newReflectiveBlockingService(genericRefreshXlator); newReflectiveBlockingService(genericRefreshXlator);
RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator =
new RefreshCallQueueProtocolServerSideTranslatorPB(this);
BlockingService refreshCallQueueService =
RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService.
newReflectiveBlockingService(refreshCallQueueXlator);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, adminServer); genericRefreshService, adminServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer);
} }
/** /**
@ -764,4 +776,12 @@ public boolean refreshSuperUserGroupsConfiguration() throws IOException {
ProxyUsers.refreshSuperUserGroupsConfiguration(); ProxyUsers.refreshSuperUserGroupsConfiguration();
return true; return true;
} }
@Override // RefreshCallQueueProtocol
public void refreshCallQueue() throws IOException {
LOG.info("Refreshing call queue.");
Configuration configuration = new Configuration();
router.getRpcServer().getServer().refreshCallQueue(configuration);
}
} }

View File

@ -77,6 +77,8 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -388,6 +390,8 @@ public int run(String[] argv) throws Exception {
exitCode = genericRefresh(argv, i); exitCode = genericRefresh(argv, i);
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) { } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
exitCode = refreshSuperUserGroupsConfiguration(); exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshCallQueue".equals(cmd)) {
exitCode = refreshCallQueue();
} else { } else {
throw new IllegalArgumentException("Unknown Command: " + cmd); throw new IllegalArgumentException("Unknown Command: " + cmd);
} }
@ -1258,6 +1262,39 @@ public int genericRefresh(String[] argv, int i) throws IOException {
} }
} }
/**
* Refresh Router's call Queue.
*
* @throws IOException if the operation was not successful.
*/
private int refreshCallQueue() throws IOException {
Configuration conf = getConf();
String hostport = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
// Create the client
Class<?> xface = RefreshCallQueueProtocolPB.class;
InetSocketAddress address = NetUtils.createSocketAddr(hostport);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)RPC.getProxy(
xface, RPC.getProtocolVersion(xface), address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf), 0);
int returnCode = -1;
try (RefreshCallQueueProtocolClientSideTranslatorPB xlator =
new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) {
xlator.refreshCallQueue();
System.out.println("Refresh call queue successfully for " + hostport);
returnCode = 0;
} catch (IOException ioe){
System.out.println("Refresh call queue unsuccessfully for " + hostport);
}
return returnCode;
}
/** /**
* Normalize a path for that filesystem. * Normalize a path for that filesystem.
* *

View File

@ -1740,6 +1740,15 @@ public void testErrorFaultTolerant() throws Exception {
assertEquals(0, ToolRunner.run(admin, argv)); assertEquals(0, ToolRunner.run(admin, argv));
} }
@Test
public void testRefreshCallQueue() throws Exception {
System.setOut(new PrintStream(out));
String[] argv = new String[]{"-refreshCallQueue"};
assertEquals(0, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains("Refresh call queue successfully"));
}
private void addMountTable(String src, String nsId, String dst) private void addMountTable(String src, String nsId, String dst)
throws Exception { throws Exception {
String[] argv = new String[] {"-add", src, nsId, dst}; String[] argv = new String[] {"-add", src, nsId, dst};