HDFS-14927. RBF: Add metrics for async callers thread pool. Contributed by Leon Gao.
This commit is contained in:
parent
02009c3bb7
commit
f18bbdd9d8
@ -93,4 +93,10 @@ public interface FederationRPCMBean {
|
||||
* @return JSON string representation.
|
||||
*/
|
||||
String getRpcClientConnections();
|
||||
|
||||
/**
|
||||
* Get the JSON representation of the async caller thread pool.
|
||||
* @return JSON string representation of the async caller thread pool.
|
||||
*/
|
||||
String getAsyncCallerPool();
|
||||
}
|
||||
|
@ -220,6 +220,11 @@ public String getRpcClientConnections() {
|
||||
return rpcServer.getRPCClient().getJSON();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAsyncCallerPool() {
|
||||
return rpcServer.getRPCClient().getAsyncCallerPoolJson();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the time to proxy an operation from the moment the Router sends it to
|
||||
* the Namenode until it replied.
|
||||
|
@ -69,6 +69,7 @@
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -263,6 +264,19 @@ public String getJSON() {
|
||||
return this.connectionManager.getJSON();
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON representation of the async caller thread pool.
|
||||
*
|
||||
* @return String representation of the JSON.
|
||||
*/
|
||||
public String getAsyncCallerPoolJson() {
|
||||
final Map<String, Integer> info = new LinkedHashMap<>();
|
||||
info.put("active", executorService.getActiveCount());
|
||||
info.put("total", executorService.getPoolSize());
|
||||
info.put("max", executorService.getMaximumPoolSize());
|
||||
return JSON.toString(info);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get ClientProtocol proxy client for a NameNode. Each combination of user +
|
||||
* NN must use a unique proxy client. Previously created clients are cached
|
||||
|
@ -31,6 +31,7 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
@ -48,6 +49,8 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -356,4 +359,71 @@ public void testNoNamenodesAvailable() throws Exception{
|
||||
// Router 0 failures do not change
|
||||
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncCallerPoolMetrics() throws Exception {
|
||||
setupCluster(true, false);
|
||||
simulateSlowNamenode(cluster.getCluster().getNameNode(0), 2);
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
// Set only one router to make test easier
|
||||
cluster.getRouters().remove(1);
|
||||
FederationRPCMetrics metrics = cluster.getRouters().get(0).getRouter()
|
||||
.getRpcServer().getRPCMetrics();
|
||||
|
||||
// No active connection initially
|
||||
Map<String, Integer> result = objectMapper
|
||||
.readValue(metrics.getAsyncCallerPool(), Map.class);
|
||||
assertEquals(0, result.get("active").intValue());
|
||||
assertEquals(0, result.get("total").intValue());
|
||||
assertEquals(4, result.get("max").intValue());
|
||||
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
|
||||
try {
|
||||
// Run a client request to create an active connection
|
||||
exec.submit(() -> {
|
||||
DFSClient routerClient = null;
|
||||
try {
|
||||
routerClient = new DFSClient(new URI("hdfs://fed"),
|
||||
cluster.getRouterClientConf());
|
||||
String clientName = routerClient.getClientName();
|
||||
ClientProtocol routerProto = routerClient.getNamenode();
|
||||
routerProto.renewLease(clientName);
|
||||
} catch (Exception e) {
|
||||
fail("Client request failed: " + e);
|
||||
} finally {
|
||||
if (routerClient != null) {
|
||||
try {
|
||||
routerClient.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot close the client");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for client request to be active
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
Map<String, Integer> newResult = objectMapper.readValue(
|
||||
metrics.getAsyncCallerPool(), Map.class);
|
||||
if (newResult.get("active") != 1) {
|
||||
return false;
|
||||
}
|
||||
if (newResult.get("max") != 4) {
|
||||
return false;
|
||||
}
|
||||
int total = newResult.get("total");
|
||||
// "total" is dynamic
|
||||
return total >= 1 && total <= 4;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Not able to parse metrics result: " + e);
|
||||
}
|
||||
return false;
|
||||
}, 100, 2000);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user