YARN-8542. Added YARN service REST API to list containers.
Contributed by Chandni Singh
This commit is contained in:
parent
66e7a2c787
commit
292c9e017f
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
@ -807,12 +808,12 @@ private Service getServiceFromClient(UserGroupInformation ugi,
|
||||
});
|
||||
}
|
||||
|
||||
private Container[] getContainers(UserGroupInformation ugi,
|
||||
private ComponentContainers[] getContainers(UserGroupInformation ugi,
|
||||
String serviceName, List<String> componentNames, String version,
|
||||
List<ContainerState> containerStates) throws IOException,
|
||||
InterruptedException {
|
||||
return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> {
|
||||
Container[] result;
|
||||
return ugi.doAs((PrivilegedExceptionAction<ComponentContainers[]>) () -> {
|
||||
ComponentContainers[] result;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
|
@ -192,6 +192,41 @@ paths:
|
||||
description: Unexpected error
|
||||
schema:
|
||||
$ref: '#/definitions/ServiceStatus'
|
||||
/app/v1/services/{service_name}/component-instances:
|
||||
get:
|
||||
summary: Get component instances.
|
||||
description: Get component instances which match the query params.
|
||||
parameters:
|
||||
- name: service_name
|
||||
in: path
|
||||
description: Service name
|
||||
required: true
|
||||
type: string
|
||||
- name: componentName
|
||||
in: query
|
||||
description: Component name. Multiple values are allowed.
|
||||
type: string
|
||||
- name: version
|
||||
in: query
|
||||
description: Version of the service.
|
||||
type: string
|
||||
- name: containerState
|
||||
in: query
|
||||
description: Container state. Multiple values are allowed.
|
||||
schema:
|
||||
$ref: '#/definitions/ContainerState'
|
||||
responses:
|
||||
200:
|
||||
description: Component instances.
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/definitions/ComponentContainers'
|
||||
404:
|
||||
description: Service does not exist
|
||||
default:
|
||||
description: Unexpected error
|
||||
type: string
|
||||
definitions:
|
||||
Service:
|
||||
description: a service resource has the following attributes.
|
||||
@ -594,6 +629,18 @@ definitions:
|
||||
keytab:
|
||||
type: string
|
||||
description: The URI of the kerberos keytab. Currently supports only files present on the bare host. URI starts with "file\://" - A path on the local host where the keytab is stored. It is assumed that admin pre-installs the keytabs on the local host before AM launches.
|
||||
|
||||
ComponentContainers:
|
||||
description: Containers of a component.
|
||||
required:
|
||||
- component_name
|
||||
properties:
|
||||
component_name:
|
||||
type: string
|
||||
description: Name of the component.
|
||||
containers:
|
||||
type: array
|
||||
description: Containers of the component.
|
||||
items:
|
||||
$ref: '#/definitions/Container'
|
||||
|
||||
|
||||
|
@ -47,7 +47,7 @@
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||
@ -205,10 +205,11 @@ public CompInstancesUpgradeResponseProto upgrade(
|
||||
@Override
|
||||
public GetCompInstancesResponseProto getCompInstances(
|
||||
GetCompInstancesRequestProto request) throws IOException {
|
||||
List<Container> containers = FilterUtils.filterInstances(context, request);
|
||||
List<ComponentContainers> containers = FilterUtils.filterInstances(context,
|
||||
request);
|
||||
return GetCompInstancesResponseProto.newBuilder().setCompInstances(
|
||||
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
|
||||
new Container[containers.size()]))).build();
|
||||
ServiceApiUtil.COMP_CONTAINERS_JSON_SERDE.toJson(containers.toArray(
|
||||
new ComponentContainers[containers.size()]))).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service.api.records;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Containers of a component.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
@ApiModel(description = "Containers of a component.")
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class ComponentContainers {
|
||||
|
||||
private static final long serialVersionUID = -1456748479118874991L;
|
||||
|
||||
@JsonProperty("component_name")
|
||||
private String componentName;
|
||||
|
||||
@JsonProperty("containers")
|
||||
private List<Container> containers;
|
||||
|
||||
@ApiModelProperty(example = "null", required = true,
|
||||
value = "Name of the component.")
|
||||
public String getComponentName() {
|
||||
return componentName;
|
||||
}
|
||||
|
||||
public void setComponentName(String name) {
|
||||
this.componentName = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Name of the service component.
|
||||
**/
|
||||
public ComponentContainers name(String name) {
|
||||
this.componentName = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the containers of the component.
|
||||
*/
|
||||
@ApiModelProperty(example = "null", value = "Containers of the component.")
|
||||
public List<Container> getContainers() {
|
||||
return containers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the containers.
|
||||
* @param containers containers of the component.
|
||||
*/
|
||||
public void setContainers(List<Container> containers) {
|
||||
this.containers = containers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the containers.
|
||||
* @param compContainers containers of the component.
|
||||
*/
|
||||
public ComponentContainers containers(List<Container> compContainers) {
|
||||
this.containers = compContainers;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a container.
|
||||
* @param container container
|
||||
*/
|
||||
public void addContainer(Container container) {
|
||||
containers.add(container);
|
||||
}
|
||||
}
|
@ -69,6 +69,7 @@
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
|
||||
import org.apache.hadoop.yarn.service.ClientAMProtocol;
|
||||
import org.apache.hadoop.yarn.service.ServiceMaster;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
@ -407,14 +408,15 @@ public String getInstances(String appName,
|
||||
return result.getCompInstances();
|
||||
}
|
||||
|
||||
public Container[] getContainers(String appName, List<String> components,
|
||||
public ComponentContainers[] getContainers(String appName,
|
||||
List<String> components,
|
||||
String version, List<ContainerState> containerStates)
|
||||
throws IOException, YarnException {
|
||||
GetCompInstancesResponseProto result = filterContainers(appName, components,
|
||||
version, containerStates != null ? containerStates.stream()
|
||||
.map(Enum::toString).collect(Collectors.toList()) : null);
|
||||
|
||||
return ServiceApiUtil.CONTAINER_JSON_SERDE.fromJson(
|
||||
return ServiceApiUtil.COMP_CONTAINERS_JSON_SERDE.fromJson(
|
||||
result.getCompInstances());
|
||||
}
|
||||
|
||||
|
@ -21,10 +21,11 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -36,9 +37,11 @@ public class FilterUtils {
|
||||
* @param context service context
|
||||
* @param filterReq filter request
|
||||
*/
|
||||
public static List<Container> filterInstances(ServiceContext context,
|
||||
public static List<ComponentContainers> filterInstances(
|
||||
ServiceContext context,
|
||||
ClientAMProtocol.GetCompInstancesRequestProto filterReq) {
|
||||
List<Container> results = new ArrayList<>();
|
||||
Map<String, ComponentContainers> containersByComp = new HashMap<>();
|
||||
|
||||
Map<ContainerId, ComponentInstance> instances =
|
||||
context.scheduler.getLiveInstances();
|
||||
|
||||
@ -72,10 +75,20 @@ public static List<Container> filterInstances(ServiceContext context,
|
||||
}
|
||||
|
||||
if (include) {
|
||||
results.add(instance.getContainerSpec());
|
||||
ComponentContainers compContainers =
|
||||
containersByComp.computeIfAbsent(instance.getCompName(),
|
||||
k -> {
|
||||
ComponentContainers result = new ComponentContainers();
|
||||
result.setContainers(new ArrayList<>());
|
||||
result.setComponentName(instance.getCompName());
|
||||
return result;
|
||||
});
|
||||
|
||||
compContainers.addContainer(instance.getContainerSpec());
|
||||
}
|
||||
}));
|
||||
|
||||
return results;
|
||||
List<ComponentContainers> result = new ArrayList<>();
|
||||
result.addAll(containersByComp.values());
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
@ -79,6 +80,11 @@ public class ServiceApiUtil {
|
||||
new JsonSerDeser<>(Container[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
public static final JsonSerDeser<ComponentContainers[]>
|
||||
COMP_CONTAINERS_JSON_SERDE = new JsonSerDeser<>(
|
||||
ComponentContainers[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
public static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
|
||||
new JsonSerDeser<>(Component[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
@ -41,6 +41,7 @@
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
@ -143,9 +144,13 @@ public void testGetCompInstances() throws Exception {
|
||||
ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
|
||||
comp.addContainer(new Container().id(containerId.toString()));
|
||||
|
||||
Container[] containers = client.getContainers(service.getName(),
|
||||
Lists.newArrayList("compa"), "v1", null);
|
||||
Assert.assertEquals("num containers", 2, containers.length);
|
||||
ComponentContainers[] compContainers = client.getContainers(
|
||||
service.getName(), Lists.newArrayList("compa"), "v1", null);
|
||||
Assert.assertEquals("num comp", 1, compContainers.length);
|
||||
Assert.assertEquals("comp name", "compa",
|
||||
compContainers[0].getComponentName());
|
||||
Assert.assertEquals("num containers", 2,
|
||||
compContainers[0].getContainers().size());
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@ -239,17 +244,20 @@ static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
|
||||
GetCompInstancesRequestProto.class))).thenAnswer(
|
||||
(Answer<GetCompInstancesResponseProto>) invocation -> {
|
||||
|
||||
GetCompInstancesRequestProto req = (GetCompInstancesRequestProto)
|
||||
invocation.getArguments()[0];
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
client.context, req);
|
||||
GetCompInstancesResponseProto response =
|
||||
GetCompInstancesResponseProto.newBuilder().setCompInstances(
|
||||
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(
|
||||
containers.toArray(new Container[containers.size()])))
|
||||
.build();
|
||||
client.proxyResponse = response;
|
||||
return response;
|
||||
GetCompInstancesRequestProto req = (GetCompInstancesRequestProto)
|
||||
invocation.getArguments()[0];
|
||||
|
||||
List<ComponentContainers> compContainers =
|
||||
FilterUtils.filterInstances(client.context, req);
|
||||
GetCompInstancesResponseProto response =
|
||||
GetCompInstancesResponseProto.newBuilder().setCompInstances(
|
||||
ServiceApiUtil.COMP_CONTAINERS_JSON_SERDE.toJson(
|
||||
compContainers.toArray(
|
||||
new ComponentContainers[compContainers.size()])))
|
||||
.build();
|
||||
|
||||
client.proxyResponse = response;
|
||||
return response;
|
||||
});
|
||||
|
||||
client.setFileSystem(rule.getFs());
|
||||
|
@ -20,11 +20,11 @@
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.TestServiceManager;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
@ -42,20 +42,28 @@ public class TestFilterUtils {
|
||||
public void testNoFilter() throws Exception {
|
||||
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
|
||||
.build();
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
List<ComponentContainers> compContainers = FilterUtils.filterInstances(
|
||||
new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service")), req);
|
||||
Assert.assertEquals("num containers", 4, containers.size());
|
||||
Assert.assertEquals("num comps", 2, compContainers.size());
|
||||
compContainers.forEach(item -> {
|
||||
Assert.assertEquals("num containers", 2, item.getContainers().size());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterWithComp() throws Exception {
|
||||
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
|
||||
.addAllComponentNames(Lists.newArrayList("compa")).build();
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
List<ComponentContainers> compContainers = FilterUtils.filterInstances(
|
||||
new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service")), req);
|
||||
Assert.assertEquals("num containers", 2, containers.size());
|
||||
Assert.assertEquals("num comps", 1, compContainers.size());
|
||||
Assert.assertEquals("comp name", "compa",
|
||||
compContainers.get(0).getComponentName());
|
||||
|
||||
Assert.assertEquals("num containers", 2,
|
||||
compContainers.get(0).getContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -66,18 +74,15 @@ public void testFilterWithVersion() throws Exception {
|
||||
GetCompInstancesRequestProto.newBuilder();
|
||||
|
||||
reqBuilder.setVersion("v2");
|
||||
Assert.assertEquals("num containers", 0,
|
||||
Assert.assertEquals("num comps", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
|
||||
reqBuilder.addAllComponentNames(Lists.newArrayList("compa"))
|
||||
.setVersion("v1").build();
|
||||
|
||||
Assert.assertEquals("num containers", 2,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
|
||||
reqBuilder.setVersion("v2").build();
|
||||
Assert.assertEquals("num containers", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).get(0)
|
||||
.getContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -89,13 +94,17 @@ public void testFilterWithState() throws Exception {
|
||||
|
||||
reqBuilder.addAllContainerStates(Lists.newArrayList(
|
||||
ContainerState.READY.toString()));
|
||||
Assert.assertEquals("num containers", 4,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
List<ComponentContainers> compContainers = FilterUtils.filterInstances(sc,
|
||||
reqBuilder.build());
|
||||
Assert.assertEquals("num comps", 2, compContainers.size());
|
||||
compContainers.forEach(item -> {
|
||||
Assert.assertEquals("num containers", 2, item.getContainers().size());
|
||||
});
|
||||
|
||||
reqBuilder.clearContainerStates();
|
||||
reqBuilder.addAllContainerStates(Lists.newArrayList(
|
||||
ContainerState.STOPPED.toString()));
|
||||
Assert.assertEquals("num containers", 0,
|
||||
Assert.assertEquals("num comps", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user