From a33ef4fd311784dc15401eb54c82e78528c4f961 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Thu, 24 Jan 2019 18:43:21 -0500 Subject: [PATCH] YARN-8867. Added resource localization status to YARN service status call. Contributed by Chandni Singh --- .../app/launcher/TestContainerLauncher.java | 9 + .../launcher/TestContainerLauncherImpl.java | 9 + .../yarn/api/ContainerManagementProtocol.java | 19 ++ .../GetLocalizationStatusesRequest.java | 69 +++++ .../GetLocalizationStatusesResponse.java | 87 ++++++ .../yarn/api/records/LocalizationState.java | 36 +++ .../yarn/api/records/LocalizationStatus.java | 95 +++++++ .../proto/containermanagement_protocol.proto | 4 + .../src/main/proto/yarn_service_protos.proto | 28 ++ .../yarn/service/api/records/Container.java | 30 ++ .../api/records/LocalizationStatus.java | 132 +++++++++ .../yarn/service/component/Component.java | 9 +- .../component/instance/ComponentInstance.java | 141 +++++++++- .../ContainerLaunchService.java | 28 +- .../provider/AbstractProviderService.java | 12 +- .../service/provider/ProviderService.java | 40 ++- .../yarn/service/provider/ProviderUtils.java | 19 +- .../service/MockRunningServiceContext.java | 39 ++- .../hadoop/yarn/service/MockServiceAM.java | 2 +- .../hadoop/yarn/service/ServiceTestUtils.java | 2 + .../hadoop/yarn/service/TestServiceAM.java | 7 +- .../instance/TestComponentInstance.java | 61 ++++ .../service/provider/TestProviderUtils.java | 9 +- .../hadoop/yarn/client/api/NMClient.java | 34 +++ .../yarn/client/api/impl/NMClientImpl.java | 56 ++++ ...ntainerManagementProtocolPBClientImpl.java | 22 ++ ...tainerManagementProtocolPBServiceImpl.java | 20 ++ .../GetLocalizationStatusesRequestPBImpl.java | 156 +++++++++++ ...GetLocalizationStatusesResponsePBImpl.java | 260 ++++++++++++++++++ .../impl/pb/LocalizationStatusPBImpl.java | 192 +++++++++++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 20 ++ .../hadoop/yarn/TestContainerLaunchRPC.java | 9 + .../TestContainerResourceIncreaseRPC.java | 9 + .../java/org/apache/hadoop/yarn/TestRPC.java | 9 + .../ContainerManagerImpl.java | 53 ++++ .../containermanager/container/Container.java | 7 + .../container/ContainerImpl.java | 16 +- .../localizer/ResourceSet.java | 45 ++- .../TestContainerManager.java | 128 +++++++++ .../localizer/TestResourceSet.java | 106 +++++++ .../nodemanager/webapp/MockContainer.java | 6 + .../server/resourcemanager/NodeManager.java | 9 + .../resourcemanager/TestAMAuthorization.java | 9 + .../TestApplicationMasterLauncher.java | 9 + 44 files changed, 2028 insertions(+), 34 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index d5bf03d89b..222c2ae39b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -516,5 +518,12 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 0ae0380ea8..7788300cf4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -521,6 +523,13 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } @SuppressWarnings("serial") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 8fceb46e4c..0444440eba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -288,4 +290,21 @@ RollbackResponse rollbackLastReInitialization(ContainerId containerId) @Unstable CommitResponse commitLastReInitialization(ContainerId containerId) throws YarnException, IOException; + + /** + * API to request for the localization statuses of requested containers from + * the Node Manager. + * @param request {@link GetLocalizationStatusesRequest} which includes the + * container ids of all the containers whose localization + * statuses are needed. + * @return {@link GetLocalizationStatusesResponse} which contains the + * localization statuses of all the requested containers. + * @throws YarnException Exception specific to YARN. + * @throws IOException IOException thrown from the RPC layer. + */ + @Public + @Unstable + GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java new file mode 100644 index 0000000000..e6c39475a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesRequest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * The request sent by an application master to the node manager to get + * {@link LocalizationStatus}es of containers. + * + * @see ContainerManagementProtocol#getLocalizationStatuses( + * GetLocalizationStatusesRequest) + */ +@Public +@Unstable +public abstract class GetLocalizationStatusesRequest { + + @Public + @Unstable + public static GetLocalizationStatusesRequest newInstance( + List containerIds) { + GetLocalizationStatusesRequest request = + Records.newRecord(GetLocalizationStatusesRequest.class); + request.setContainerIds(containerIds); + return request; + } + + /** + * Get the list of container IDs of the containers for which the localization + * statuses are needed. + * + * @return the list of container IDs. + */ + @Public + @Unstable + public abstract List getContainerIds(); + + /** + * Sets the list of container IDs of containers for which the localization + * statuses are needed. + * @param containerIds the list of container IDs. + */ + @Public + @Unstable + public abstract void setContainerIds(List containerIds); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java new file mode 100644 index 0000000000..89fca9fbbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetLocalizationStatusesResponse.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; +import java.util.Map; + + +/** + * The response sent by the node manager to an application master when + * localization statuses are requested. + * + * @see ContainerManagementProtocol#getLocalizationStatuses( + * GetLocalizationStatusesRequest) + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class GetLocalizationStatusesResponse { + + public static GetLocalizationStatusesResponse newInstance( + Map> statuses, + Map failedRequests) { + GetLocalizationStatusesResponse response = + Records.newRecord(GetLocalizationStatusesResponse.class); + response.setLocalizationStatuses(statuses); + return response; + } + + /** + * Get all the container localization statuses. + * + * @return container localization statuses. + */ + public abstract Map> getLocalizationStatuses(); + + /** + * Sets the container localization statuses. + * + * @param statuses container localization statuses. + */ + @InterfaceAudience.Private + public abstract void setLocalizationStatuses( + Map> statuses); + + + /** + * Get the containerId-to-exception map in which the exception indicates error + * from per container for failed requests. + * + * @return map of containerId-to-exception + */ + @InterfaceAudience.Private + public abstract Map getFailedRequests(); + + /** + * Set the containerId-to-exception map in which the exception indicates error + * from per container for failed request. + */ + @InterfaceAudience.Private + public abstract void setFailedRequests( + Map failedContainers); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java new file mode 100644 index 0000000000..0505d5f6d6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationState.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * State of localization. + */ +@Public +@Unstable +public enum LocalizationState { + + PENDING, + + COMPLETED, + + FAILED +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java new file mode 100644 index 0000000000..bca95b7fef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalizationStatus.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * Represents the localization status of a resource. + * The status of the localization includes: + *

    + *
  • resource key
  • + *
  • {@link LocalizationState} of the resource
  • + *
+ */ +@Public +@Unstable +public abstract class LocalizationStatus { + + public static LocalizationStatus newInstance(String resourceKey, + LocalizationState localizationState) { + return newInstance(resourceKey, localizationState, null); + } + + public static LocalizationStatus newInstance(String resourceKey, + LocalizationState localizationState, + String diagnostics) { + LocalizationStatus status = Records.newRecord(LocalizationStatus.class); + status.setResourceKey(resourceKey); + status.setLocalizationState(localizationState); + status.setDiagnostics(diagnostics); + return status; + } + + /** + * Get the resource key. + * + * @return resource key. + */ + public abstract String getResourceKey(); + + /** + * Sets the resource key. + * @param resourceKey + */ + @InterfaceAudience.Private + public abstract void setResourceKey(String resourceKey); + + /** + * Get the localization sate. + * + * @return localization state. + */ + public abstract LocalizationState getLocalizationState(); + + /** + * Sets the localization state. + * @param state localization state + */ + @InterfaceAudience.Private + public abstract void setLocalizationState(LocalizationState state); + + /** + * Get the diagnostics. + * + * @return diagnostics. + */ + public abstract String getDiagnostics(); + + /** + * Sets the diagnostics. + * @param diagnostics diagnostics. + */ + @InterfaceAudience.Private + public abstract void setDiagnostics(String diagnostics); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 22b440693d..1f8cafbd7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -44,4 +44,8 @@ service ContainerManagementProtocolService { rpc restartContainer(ContainerIdProto) returns (RestartContainerResponseProto); rpc rollbackLastReInitialization(ContainerIdProto) returns (RollbackResponseProto); rpc commitLastReInitialization(ContainerIdProto) returns (CommitResponseProto); + + rpc getLocalizationStatuses(GetLocalizationStatusesRequestProto) + returns (GetLocalizationStatusesResponseProto); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 248f775bde..b58b828138 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -543,3 +543,31 @@ message RunSharedCacheCleanerTaskRequestProto { message RunSharedCacheCleanerTaskResponseProto { optional bool accepted = 1; } + +// Localization +message GetLocalizationStatusesRequestProto { + repeated ContainerIdProto container_id = 1; +} + +message GetLocalizationStatusesResponseProto { + repeated ContainerLocalizationStatusesProto cntn_localization_statuses = 1; + repeated ContainerExceptionMapProto failed_requests = 2; +} + +enum LocalizationStateProto { + L_PENDING = 1; + L_COMPLETED = 2; + L_FAILED = 3; +} + + +message LocalizationStatusProto { + optional string resource_key = 1; + optional LocalizationStateProto localization_state= 2; + optional string diagnostics = 3; +} + +message ContainerLocalizationStatusesProto { + optional ContainerIdProto container_id = 1; + repeated LocalizationStatusProto localization_statuses = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java index 48d54e9d67..99ba799ba1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Container.java @@ -55,6 +55,7 @@ public class Container extends BaseResource { private Artifact artifact = null; private Boolean privilegedContainer = null; private Map>> exposedPorts = null; + private List localizationStatuses = null; /** * Unique container id of a running service, e.g. @@ -258,6 +259,35 @@ public void setExposedPorts(Map>> ports) { this.exposedPorts = ports; } + /** + * Localization statuses. + */ + @ApiModelProperty(example = "null", value = + "Localization statuses of a container.") + @JsonProperty("localization_statuses") + public List getLocalizationStatuses() { + return localizationStatuses; + } + + /** + * Sets the localization statuses. + * @param statuses localization statuses. + */ + @XmlElement(name = "localization_statuses") + public void setLocalizationStatuses(List statuses) { + this.localizationStatuses = statuses; + } + + /** + * Sets the localization statuses and returns the container. + * @param statuses + * @return + */ + public Container localizationStatuses(List statuses) { + this.localizationStatuses = statuses; + return this; + } + @Override public boolean equals(java.lang.Object o) { if (this == o) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java new file mode 100644 index 0000000000..3f76ba3a34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/LocalizationStatus.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.api.records; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.LocalizationState; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +/** + * The status of localization. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +@ApiModel(description = "Localization status of a resource.") +@XmlRootElement +@JsonInclude(JsonInclude.Include.NON_NULL) +public class LocalizationStatus implements Serializable { + + private static final long serialVersionUID = -5745287278502373531L; + + private String destFile; + + private LocalizationState state; + + private String diagnostics; + + /** + * Destination file. + */ + @JsonProperty("dest_file") + public String getDestFile() { + return destFile; + } + + /** + * Sets the destination file. + * + * @param destFile destination file + */ + @XmlElement(name = "dest_file") + public void setDestFile(String destFile) { + this.destFile = destFile; + } + + /** + * Sets the destination file and returns the localization status. + * + * @param fileName destination file + */ + public LocalizationStatus destFile(String fileName) { + this.destFile = fileName; + return this; + } + + /** + * Localization state. + */ + @JsonProperty("state") + public LocalizationState getState() { + return state; + } + + /** + * Sets the localization state. + * + * @param localizationState localization state + */ + @XmlElement(name = "state") + public void setState(LocalizationState localizationState) { + this.state = localizationState; + } + + /** + * Sets the localization state and returns the localization status. + * + * @param localizationState localization state + */ + public LocalizationStatus state(LocalizationState localizationState) { + this.state = localizationState; + return this; + } + + /** + * Diagnostics. + */ + @JsonProperty("diagnostics") + public String getDiagnostics() { + return diagnostics; + } + + /** + * Sets the diagnostics. + * + * @param diag diagnostics + */ + @XmlElement(name = "diagnostics") + public void setDiagnostics(String diag) { + this.diagnostics = diag; + } + + /** + * Sets the diagnostics and returns the localization status. + * + * @param diag diagnostics + */ + public LocalizationStatus diagnostics(String diag) { + this.diagnostics = diag; + return this; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index f885b25d4d..8958dc70e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; import org.apache.hadoop.yarn.service.monitor.probe.Probe; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.provider.ProviderService; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceUtils; @@ -79,6 +80,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -695,19 +697,22 @@ private void assignContainerToCompInstance(Container container) { "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId()); + Future resolvedParamFuture; if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) { UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ? cancelUpgradeStatus : upgradeStatus; - scheduler.getContainerLaunchService() + resolvedParamFuture = scheduler.getContainerLaunchService() .launchCompInstance(scheduler.getApp(), instance, container, createLaunchContext(status.getTargetSpec(), status.getTargetVersion())); } else { - scheduler.getContainerLaunchService().launchCompInstance( + resolvedParamFuture = scheduler.getContainerLaunchService() + .launchCompInstance( scheduler.getApp(), instance, container, createLaunchContext(componentSpec, scheduler.getApp().getVersion())); } + instance.updateResolvedLaunchParams(resolvedParamFuture); } public ContainerLaunchService.ComponentLaunchContext createLaunchContext( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 27153da262..ec62194544 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalizationState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.LocalizationStatus; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -51,6 +53,7 @@ import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; +import org.apache.hadoop.yarn.service.provider.ProviderService; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceUtils; @@ -65,10 +68,14 @@ import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Date; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,6 +122,8 @@ public class ComponentInstance implements EventHandler, private String serviceVersion; private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); private boolean pendingCancelUpgrade = false; + private ProviderService.ResolvedLaunchParams resolvedParams; + private ScheduledFuture lclizationRetrieverFuture; private static final StateMachineFactory + statusesFromNM = scheduler.getNmClient().getClient() + .getLocalizationStatuses(container.getId(), container.getNodeId()); + if (statusesFromNM != null && !statusesFromNM.isEmpty()) { + updateLocalizationStatuses(statusesFromNM); + } + } catch (YarnException | IOException e) { + LOG.warn("{} failure getting localization statuses", container.getId(), + e); + } } private static class ContainerBecomeNotReadyTransition extends BaseTransition { @@ -411,6 +435,7 @@ public void transition(ComponentInstance compInstance, (status != null ? status.getDiagnostics() : UPGRADE_FAILED)); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.cancelContainerStatusRetriever(); + compInstance.cancelLclRetriever(); if (compInstance.getState().equals(READY)) { compInstance.component.decContainersReady(true); @@ -639,13 +664,16 @@ private void cancelUpgrade() { private void reInitHelper(Component.UpgradeStatus upgradeStatus) { cancelContainerStatusRetriever(); + cancelLclRetriever(); setContainerStatus(container.getId(), null); scheduler.executorService.submit(() -> cleanupRegistry(container.getId())); - scheduler.getContainerLaunchService() + Future launchParamsFuture = + scheduler.getContainerLaunchService() .reInitCompInstance(scheduler.getApp(), this, this.container, this.component.createLaunchContext( upgradeStatus.getTargetSpec(), upgradeStatus.getTargetVersion())); + updateResolvedLaunchParams(launchParamsFuture); } private void initializeStatusRetriever(ComponentInstanceEvent event, @@ -750,6 +778,61 @@ public String getCompInstanceName() { return compInstanceId.getCompInstanceName(); } + @VisibleForTesting + void updateLocalizationStatuses( + List statuses) { + Map resourcesCpy = new HashMap<>(); + try { + readLock.lock(); + if (resolvedParams == null || resolvedParams.didLaunchFail() || + resolvedParams.getResolvedRsrcPaths() == null || + resolvedParams.getResolvedRsrcPaths().isEmpty()) { + cancelLclRetriever(); + return; + } + resourcesCpy.putAll(resolvedParams.getResolvedRsrcPaths()); + } finally { + readLock.unlock(); + } + boolean allCompleted = true; + Map fromNM = new HashMap<>(); + statuses.forEach(statusFromNM -> { + LocalizationStatus lstatus = new LocalizationStatus() + .destFile(statusFromNM.getResourceKey()) + .diagnostics(statusFromNM.getDiagnostics()) + .state(statusFromNM.getLocalizationState()); + fromNM.put(statusFromNM.getResourceKey(), lstatus); + }); + + for (String resourceKey : resourcesCpy.keySet()) { + LocalizationStatus lstatus = fromNM.get(resourceKey); + if (lstatus == null || + lstatus.getState().equals(LocalizationState.PENDING)) { + allCompleted = false; + break; + } + } + + List statusList = new ArrayList<>(); + statusList.addAll(fromNM.values()); + this.containerSpec.setLocalizationStatuses(statusList); + if (allCompleted) { + cancelLclRetriever(); + } + } + + public void updateResolvedLaunchParams( + Future future) { + try { + writeLock.lock(); + this.resolvedParams = future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("{} updating resolved params", getCompInstanceId(), e); + } finally { + writeLock.unlock(); + } + } + public ContainerStatus getContainerStatus() { try { readLock.lock(); @@ -916,6 +999,7 @@ public void destroy() { cancelContainerStatusRetriever(); scheduler.executorService.submit(() -> cleanupRegistryAndCompHdfsDir(containerId)); + cancelLclRetriever(); } private void cleanupRegistry(ContainerId containerId) { @@ -998,6 +1082,61 @@ private void cancelContainerStatusRetriever() { } } + private static class LocalizationStatusRetriever implements Runnable { + private ContainerId containerId; + private NodeId nodeId; + private NMClient nmClient; + private ComponentInstance instance; + + LocalizationStatusRetriever(ServiceScheduler scheduler, + ContainerId containerId, ComponentInstance instance) { + this.nmClient = scheduler.getNmClient().getClient(); + this.containerId = containerId; + this.instance = instance; + this.nodeId = instance.getNodeId(); + } + + @Override + public void run() { + List + statusesFromNM = null; + try { + statusesFromNM = nmClient.getLocalizationStatuses(containerId, + nodeId); + } catch (YarnException | IOException e) { + LOG.error("{} Failed to get localization statuses for {} {} ", + instance.compInstanceId, nodeId, containerId, e); + } + if (statusesFromNM != null && !statusesFromNM.isEmpty()) { + instance.updateLocalizationStatuses(statusesFromNM); + } + } + } + + private void initializeLocalizationStatusRetriever( + ContainerId containerId) { + LOG.info("{} retrieve localization statuses", compInstanceId); + lclizationRetrieverFuture = scheduler.executorService.scheduleAtFixedRate( + new LocalizationStatusRetriever(scheduler, containerId, this), + 0, 1, TimeUnit.SECONDS + ); + } + + private void cancelLclRetriever() { + if (lclizationRetrieverFuture != null && + !lclizationRetrieverFuture.isDone()) { + LOG.info("{} cancelling localization retriever", compInstanceId); + lclizationRetrieverFuture.cancel(true); + } + } + + @VisibleForTesting + boolean isLclRetrieverActive() { + return lclizationRetrieverFuture != null && + !lclizationRetrieverFuture.isCancelled() + && !lclizationRetrieverFuture.isDone(); + } + public String getHostname() { return getCompInstanceName() + getComponent().getHostnameSuffix(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index 153ab465f2..1574d6df1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -34,8 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.hadoop.yarn.service.provider.ProviderService.FAILED_LAUNCH_PARAMS; public class ContainerLaunchService extends AbstractService{ @@ -65,24 +69,27 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - public void launchCompInstance(Service service, + public Future launchCompInstance( + Service service, ComponentInstance instance, Container container, ComponentLaunchContext componentLaunchContext) { ContainerLauncher launcher = new ContainerLauncher(service, instance, container, componentLaunchContext, false); - executorService.execute(launcher); + return executorService.submit(launcher); } - public void reInitCompInstance(Service service, + public Future reInitCompInstance( + Service service, ComponentInstance instance, Container container, ComponentLaunchContext componentLaunchContext) { ContainerLauncher reInitializer = new ContainerLauncher(service, instance, container, componentLaunchContext, true); - executorService.execute(reInitializer); + return executorService.submit(reInitializer); } - private class ContainerLauncher implements Runnable { + private class ContainerLauncher implements + Callable { public final Container container; public final Service service; public ComponentInstance instance; @@ -99,12 +106,14 @@ private class ContainerLauncher implements Runnable { this.reInit = reInit; } - @Override public void run() { + @Override + public ProviderService.ResolvedLaunchParams call() { ProviderService provider = ProviderFactory.getProviderService( componentLaunchContext.getArtifact()); AbstractLauncher launcher = new AbstractLauncher(context); + ProviderService.ResolvedLaunchParams resolvedParams = null; try { - provider.buildContainerLaunchContext(launcher, service, + resolvedParams = provider.buildContainerLaunchContext(launcher, service, instance, fs, getConfig(), container, componentLaunchContext); if (!reInit) { LOG.info("launching container {}", container.getId()); @@ -126,6 +135,11 @@ private class ContainerLauncher implements Runnable { .setInstance(instance).setContainerId(container.getId()); context.scheduler.getDispatcher().getEventHandler().handle(event); } + if (resolvedParams != null) { + return resolvedParams; + } else { + return FAILED_LAUNCH_PARAMS; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java index 4394e62922..52f2a4eb01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java @@ -136,11 +136,13 @@ public void buildContainerRetry(AbstractLauncher launcher, } } - public void buildContainerLaunchContext(AbstractLauncher launcher, + public ResolvedLaunchParams buildContainerLaunchContext( + AbstractLauncher launcher, Service service, ComponentInstance instance, SliderFileSystem fileSystem, Configuration yarnConf, Container container, ContainerLaunchService.ComponentLaunchContext compLaunchContext) throws IOException, SliderException { + ResolvedLaunchParams resolved = new ResolvedLaunchParams(); processArtifact(launcher, instance, fileSystem, service, compLaunchContext); ServiceContext context = @@ -154,13 +156,13 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, fileSystem, yarnConf, container, compLaunchContext, tokensForSubstitution); - // create config file on hdfs and add local resource + // create config file on hdfs and addResolvedRsrcPath local resource ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem, - compLaunchContext, tokensForSubstitution, instance, context); + compLaunchContext, tokensForSubstitution, instance, context, resolved); // handles static files (like normal file / archive file) for localization. ProviderUtils.handleStaticFilesForLocalization(launcher, fileSystem, - compLaunchContext); + compLaunchContext, resolved); // replace launch command with token specific information buildContainerLaunchCommand(launcher, service, instance, fileSystem, @@ -168,5 +170,7 @@ public void buildContainerLaunchContext(AbstractLauncher launcher, // Setup container retry settings buildContainerRetry(launcher, yarnConf, compLaunchContext, instance); + + return resolved; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java index fe765de695..96b24d2013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.provider; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.service.api.records.Service; @@ -28,16 +29,53 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public interface ProviderService { /** * Set up the entire container launch context */ - void buildContainerLaunchContext(AbstractLauncher containerLauncher, + ResolvedLaunchParams buildContainerLaunchContext( + AbstractLauncher containerLauncher, Service service, ComponentInstance instance, SliderFileSystem sliderFileSystem, Configuration yarnConf, Container container, ContainerLaunchService.ComponentLaunchContext componentLaunchContext) throws IOException, SliderException; + + /** + * This holds any information that is resolved during building the launch + * context for a container. + *

+ * Right now it contains a mapping of resource keys to destination files + * for resources that need to be localized. + */ + class ResolvedLaunchParams { + private Map resolvedRsrcPaths = new HashMap<>(); + + void addResolvedRsrcPath(String resourceKey, String destFile) { + Preconditions.checkNotNull(destFile, "dest file cannot be null"); + Preconditions.checkNotNull(resourceKey, + "local resource cannot be null"); + resolvedRsrcPaths.put(resourceKey, destFile); + } + + public Map getResolvedRsrcPaths() { + return this.resolvedRsrcPaths; + } + + public boolean didLaunchFail() { + return false; + } + } + + ResolvedLaunchParams FAILED_LAUNCH_PARAMS = new ResolvedLaunchParams() { + @Override + public boolean didLaunchFail() { + return true; + } + }; + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index c12c3407ec..88883f71f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -180,7 +180,10 @@ public static synchronized void createConfigFileAndAddLocalResource( AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService.ComponentLaunchContext compLaunchContext, Map tokensForSubstitution, ComponentInstance instance, - ServiceContext context) throws IOException { + ServiceContext context, ProviderService.ResolvedLaunchParams + resolvedParams) + throws IOException { + Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { log.info("{} version {} : Creating dir on hdfs: {}", @@ -254,13 +257,15 @@ public static synchronized void createConfigFileAndAddLocalResource( fs.createAmResource(remoteFile, LocalResourceType.FILE); Path destFile = new Path(configFile.getDestFile()); String symlink = APP_CONF_DIR + "/" + fileName; - addLocalResource(launcher, symlink, configResource, destFile); + addLocalResource(launcher, symlink, configResource, destFile, + resolvedParams); } } public static synchronized void handleStaticFilesForLocalization( AbstractLauncher launcher, SliderFileSystem fs, ContainerLaunchService - .ComponentLaunchContext componentLaunchCtx) + .ComponentLaunchContext componentLaunchCtx, + ProviderService.ResolvedLaunchParams resolvedParams) throws IOException { for (ConfigFile staticFile : componentLaunchCtx.getConfiguration().getFiles()) { @@ -298,13 +303,14 @@ public static synchronized void handleStaticFilesForLocalization( .isEmpty()) { destFile = new Path(staticFile.getDestFile()); } - - addLocalResource(launcher, destFile.getName(), localResource, destFile); + addLocalResource(launcher, destFile.getName(), localResource, destFile, + resolvedParams); } } private static void addLocalResource(AbstractLauncher launcher, - String symlink, LocalResource localResource, Path destFile) { + String symlink, LocalResource localResource, Path destFile, + ProviderService.ResolvedLaunchParams resolvedParams) { if (destFile.isAbsolute()) { launcher.addLocalResource(symlink, localResource, destFile.toString()); log.info("Added file for localization: "+ symlink +" -> " + @@ -315,6 +321,7 @@ private static void addLocalResource(AbstractLauncher launcher, log.info("Added file for localization: " + symlink+ " -> " + localResource.getResource().getFile()); } + resolvedParams.addResolvedRsrcPath(symlink, destFile.toString()); } // Static file is files uploaded by users before launch the service. Which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index b685f4b370..0245cd6fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -35,16 +36,22 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.service.provider.ProviderService; +import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.utils.ServiceUtils; +import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Future; import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -112,12 +119,38 @@ protected ServiceManager createServiceManager() { this.scheduler.init(fsWatcher.getConf()); + when(mockLaunchService.launchCompInstance(anyObject(), anyObject(), + anyObject(), anyObject())).thenAnswer( + (Answer>) + this::launchAndReinitHelper); - doNothing().when(mockLaunchService). - reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); + when(mockLaunchService.reInitCompInstance(anyObject(), anyObject(), + anyObject(), anyObject())).thenAnswer(( + Answer>) + this::launchAndReinitHelper); stabilizeComponents(this); } + private Future launchAndReinitHelper( + InvocationOnMock invocation) throws IOException, SliderException { + AbstractLauncher launcher = new AbstractLauncher( + scheduler.getContext()); + ComponentInstance instance = (ComponentInstance) + invocation.getArguments()[1]; + Container container = (Container) invocation.getArguments()[2]; + ContainerLaunchService.ComponentLaunchContext clc = + (ContainerLaunchService.ComponentLaunchContext) + invocation.getArguments()[3]; + + ProviderService.ResolvedLaunchParams resolvedParams = + new ProviderService.ResolvedLaunchParams(); + ProviderUtils.createConfigFileAndAddLocalResource(launcher, fs, clc, + new HashMap<>(), instance, scheduler.getContext(), resolvedParams); + ProviderUtils.handleStaticFilesForLocalization(launcher, fs, clc, + resolvedParams); + return Futures.immediateFuture(resolvedParams); + } + private void stabilizeComponents(ServiceContext context) { ApplicationId appId = ApplicationId.fromString(context.service.getId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index 729287cfcb..ae59c90be4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -453,6 +453,6 @@ protected ByteBuffer recordTokensForContainers() public void waitForContainerToRelease(ContainerId containerId) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> releasedContainers.contains(containerId), - 1000, 9990000); + 1000, 30000); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 6207d63205..02cf6012eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -403,6 +404,7 @@ protected void starting(Description description) { description.getClassName(), description.getMethodName()); conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString()); try { + Files.createDirectories(serviceBasePath); fs = new SliderFileSystem(conf); fs.setAppDir(new Path(serviceBasePath.toString())); } catch (IOException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 51c27e892c..bbcbee2468 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -395,7 +395,7 @@ public void testIPChange() throws TimeoutException, // Test to verify that the containers are released and the // component instance is added to the pending queue when building the launch // context fails. - @Test(timeout = 9990000) + @Test(timeout = 30000) public void testContainersReleasedWhenPreLaunchFails() throws Exception { ApplicationId applicationId = ApplicationId.newInstance( @@ -420,6 +420,11 @@ public void testContainersReleasedWhenPreLaunchFails() // allocate a container am.feedContainerToComp(exampleApp, containerId, "compa"); am.waitForContainerToRelease(containerId); + ComponentInstance compAinst0 = am.getCompInstance(compA.getName(), + "compa-0"); + GenericTestUtils.waitFor(() -> + am.getComponent(compA.getName()).getPendingInstances() + .contains(compAinst0), 2000, 30000); Assert.assertEquals(1, am.getComponent("compa").getPendingInstances().size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index c3b1602c47..f6ead01c59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -18,16 +18,22 @@ package org.apache.hadoop.yarn.service.component.instance; +import com.google.common.collect.Lists; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Configuration; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -41,6 +47,9 @@ import org.junit.Test; import org.mockito.Mockito; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -261,6 +270,58 @@ public void testCancelFailedWhileUpgradeWithFailure() throws Exception { validateCancelWhileUpgrading(false, false); } + @Test + public void testUpdateLocalizationStatuses() throws Exception { + Service def = TestServiceManager.createBaseDef( + "testUpdateLocalizationStatuses"); + + String file1 = rule.getServiceBasePath().toString() + "/file1"; + Files.write(Paths.get(file1), "test file".getBytes(), + StandardOpenOption.CREATE_NEW); + + org.apache.hadoop.yarn.service.api.records.Component compDef = + def.getComponents().iterator().next(); + ConfigFile configFile1 = new ConfigFile(); + configFile1.setType(ConfigFile.TypeEnum.STATIC); + configFile1.setSrcFile(file1); + compDef.setConfiguration(new Configuration().files( + Lists.newArrayList(configFile1))); + + ServiceContext context = new MockRunningServiceContext(rule, def); + Component component = context.scheduler.getAllComponents().get( + compDef.getName()); + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + LocalizationStatus status = LocalizationStatus.newInstance("file1", + LocalizationState.PENDING); + + instance.updateLocalizationStatuses(Lists.newArrayList(status)); + Assert.assertTrue("retriever should still be active", + instance.isLclRetrieverActive()); + + Container container = instance.getContainerSpec(); + Assert.assertTrue(container.getLocalizationStatuses() != null); + Assert.assertEquals("dest file", + container.getLocalizationStatuses().get(0).getDestFile(), + status.getResourceKey()); + Assert.assertEquals("state", + container.getLocalizationStatuses().get(0).getState(), + status.getLocalizationState()); + + status = LocalizationStatus.newInstance("file1", + LocalizationState.COMPLETED); + instance.updateLocalizationStatuses(Lists.newArrayList(status)); + Assert.assertTrue("retriever should not be active", + !instance.isLclRetrieverActive()); + Assert.assertTrue(container.getLocalizationStatuses() != null); + Assert.assertEquals("dest file", + container.getLocalizationStatuses().get(0).getDestFile(), + status.getResourceKey()); + Assert.assertEquals("state", + container.getLocalizationStatuses().get(0).getState(), + status.getLocalizationState()); + } + private void validateCancelWhileUpgrading(boolean upgradeSuccessful, boolean cancelUpgradeSuccessful) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java index 5d794d251c..ff1fb7f7a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/provider/TestProviderUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -152,13 +153,19 @@ public void setShouldBeUploadedToSharedCache( configFileList.add(new ConfigFile().srcFile("hdfs://default/sourceFile4") .type(ConfigFile.TypeEnum.STATIC)); + ProviderService.ResolvedLaunchParams resolved = + new ProviderService.ResolvedLaunchParams(); ProviderUtils.handleStaticFilesForLocalization(launcher, sfs, - compLaunchCtx); + compLaunchCtx, resolved); Mockito.verify(launcher).addLocalResource(Mockito.eq("destFile1"), any(LocalResource.class)); Mockito.verify(launcher).addLocalResource( Mockito.eq("destFile_2"), any(LocalResource.class)); Mockito.verify(launcher).addLocalResource( Mockito.eq("sourceFile4"), any(LocalResource.class)); + + Assert.assertEquals(3, resolved.getResolvedRsrcPaths().size()); + Assert.assertEquals(resolved.getResolvedRsrcPaths().get("destFile1"), + "destFile1"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 17168f799e..a8b64cc6dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -244,7 +247,38 @@ public NMTokenCache getNMTokenCache() { * @return NodeId of the container on which it is running. */ public NodeId getNodeIdOfStartedContainer(ContainerId containerId) { + return null; } + /** + * Localize resources for a container. + * @param containerId the ID of the container + * @param nodeId node Id of the container + * @param localResources resources to localize + */ + @InterfaceStability.Unstable + public void localize(ContainerId containerId, NodeId nodeId, + Map localResources) throws YarnException, + IOException { + // do nothing. + } + + /** + * Get the localization statuses of a container. + * + * @param containerId the Id of the container + * @param nodeId node Id of the container + * + * @return the status of a container. + * + * @throws YarnException YarnException. + * @throws IOException IOException. + */ + @InterfaceStability.Unstable + public List getLocalizationStatuses( + ContainerId containerId, NodeId nodeId) throws YarnException, + IOException { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index 017756e722..96a93c290f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -36,7 +37,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -48,6 +52,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; @@ -464,4 +470,54 @@ public NodeId getNodeIdOfStartedContainer(ContainerId containerId) { return null; } + @Override + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public void localize(ContainerId containerId, NodeId nodeId, + Map localResources) throws YarnException, + IOException { + ContainerManagementProtocolProxyData proxy; + StartedContainer container = startedContainers.get(containerId); + if (container != null) { + synchronized (container) { + proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId); + try { + proxy.getContainerManagementProtocol().localize( + ResourceLocalizationRequest.newInstance(containerId, + localResources)); + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } + } else { + throw new YarnException("Unknown container [" + containerId + "]"); + } + } + + @Override + public List getLocalizationStatuses( + ContainerId containerId, NodeId nodeId) throws YarnException, + IOException { + + ContainerManagementProtocolProxyData proxy = null; + List containerIds = Lists.newArrayList(containerId); + try { + proxy = cmProxy.getProxy(nodeId.toString(), containerId); + GetLocalizationStatusesResponse response = + proxy.getContainerManagementProtocol().getLocalizationStatuses( + GetLocalizationStatusesRequest.newInstance(containerIds)); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(containerId)) { + Throwable t = + response.getFailedRequests().get(containerId).deSerialize(); + parseAndThrowException(t); + } + return response.getLocalizationStatuses().get(containerId); + } finally { + if (proxy != null) { + cmProxy.mayBeCloseProxy(proxy); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 7e471f34de..af42021014 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -52,6 +54,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl; @@ -74,6 +78,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -280,4 +285,21 @@ public CommitResponse commitLastReInitialization(ContainerId containerId) return null; } } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) + throws YarnException, IOException { + GetLocalizationStatusesRequestProto requestProto = + ((GetLocalizationStatusesRequestPBImpl) request).getProto(); + try { + return new GetLocalizationStatusesResponsePBImpl( + proxy.getLocalizationStatuses(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 68e164582d..ad8a75684b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ContainerUpdateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLocalizationStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; @@ -65,6 +68,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReInitializeContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto; @@ -264,4 +269,19 @@ public CommitResponseProto commitLastReInitialization( throw new ServiceException(e); } } + + @Override + public GetLocalizationStatusesResponseProto getLocalizationStatuses( + RpcController controller, GetLocalizationStatusesRequestProto request) + throws ServiceException { + GetLocalizationStatusesRequestPBImpl lclReq = + new GetLocalizationStatusesRequestPBImpl(request); + try { + GetLocalizationStatusesResponse response = real.getLocalizationStatuses( + lclReq); + return ((GetLocalizationStatusesResponsePBImpl)response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java new file mode 100644 index 0000000000..783098fd7a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesRequestPBImpl.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesRequestProtoOrBuilder; + +import java.util.ArrayList; +import java.util.List; + +/** + * PB Impl of {@link GetLocalizationStatusesRequest}. + */ +@Private +@Unstable +public class GetLocalizationStatusesRequestPBImpl extends + GetLocalizationStatusesRequest { + private GetLocalizationStatusesRequestProto proto = + GetLocalizationStatusesRequestProto.getDefaultInstance(); + private GetLocalizationStatusesRequestProto.Builder builder; + private boolean viaProto = false; + + private List containerIds; + + public GetLocalizationStatusesRequestPBImpl() { + builder = GetLocalizationStatusesRequestProto.newBuilder(); + } + + public GetLocalizationStatusesRequestPBImpl( + GetLocalizationStatusesRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetLocalizationStatusesRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.containerIds != null) { + addLocalContainerIdsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetLocalizationStatusesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalContainerIdsToProto() { + maybeInitBuilder(); + builder.clearContainerId(); + if (this.containerIds == null) { + return; + } + List protoList = new ArrayList(); + for (ContainerId id : containerIds) { + protoList.add(convertToProtoFormat(id)); + } + builder.addAllContainerId(protoList); + } + + private void initLocalContainerIds() { + if (this.containerIds != null) { + return; + } + GetLocalizationStatusesRequestProtoOrBuilder p = viaProto ? proto : builder; + List toAdd = p.getContainerIdList(); + this.containerIds = new ArrayList<>(); + for (ContainerIdProto id : toAdd) { + this.containerIds.add(convertFromProtoFormat(id)); + } + } + + @Override + public List getContainerIds() { + initLocalContainerIds(); + return this.containerIds; + } + + @Override + public void setContainerIds(List containerIds) { + maybeInitBuilder(); + if (containerIds == null) { + builder.clearContainerId(); + } + this.containerIds = containerIds; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java new file mode 100644 index 0000000000..f42fa988c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetLocalizationStatusesResponsePBImpl.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalizationStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerExceptionMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerLocalizationStatusesProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLocalizationStatusesResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProto; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * PB Impl of {@link GetLocalizationStatusesResponse}. + */ +@Private +@Unstable +public class GetLocalizationStatusesResponsePBImpl extends + GetLocalizationStatusesResponse { + private GetLocalizationStatusesResponseProto proto = + GetLocalizationStatusesResponseProto.getDefaultInstance(); + private GetLocalizationStatusesResponseProto.Builder builder; + private boolean viaProto = false; + + private Map> localizationStatuses; + private Map failedRequests; + + public GetLocalizationStatusesResponsePBImpl() { + builder = GetLocalizationStatusesResponseProto.newBuilder(); + } + + public GetLocalizationStatusesResponsePBImpl( + GetLocalizationStatusesResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetLocalizationStatusesResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.localizationStatuses != null) { + addLocalStatusesToProto(); + } + if (this.failedRequests != null) { + addFailedRequestsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetLocalizationStatusesResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalStatusesToProto() { + maybeInitBuilder(); + builder.clearCntnLocalizationStatuses(); + if (this.localizationStatuses == null) { + return; + } + List protos = + new ArrayList(); + + this.localizationStatuses.forEach((containerId, statuses) -> { + if (statuses != null && !statuses.isEmpty()) { + ContainerLocalizationStatusesProto.Builder clProtoBuilder = + ContainerLocalizationStatusesProto.newBuilder(); + statuses.forEach(status -> { + clProtoBuilder.addLocalizationStatuses(convertToProtoFormat(status)); + }); + clProtoBuilder.setContainerId(convertToProtoFormat(containerId)); + protos.add(clProtoBuilder.build()); + } + }); + builder.addAllCntnLocalizationStatuses(protos); + } + + private void addFailedRequestsToProto() { + maybeInitBuilder(); + builder.clearFailedRequests(); + if (this.failedRequests == null) { + return; + } + List protoList = + new ArrayList(); + for (Map.Entry entry : this.failedRequests + .entrySet()) { + protoList.add(ContainerExceptionMapProto.newBuilder() + .setContainerId(convertToProtoFormat(entry.getKey())) + .setException(convertToProtoFormat(entry.getValue())).build()); + } + builder.addAllFailedRequests(protoList); + } + + + private void initLocalContainerStatuses() { + if (localizationStatuses != null) { + return; + } + GetLocalizationStatusesResponseProtoOrBuilder p = viaProto ? proto : + builder; + List protoList = + p.getCntnLocalizationStatusesList(); + localizationStatuses = new HashMap<>(); + + for (ContainerLocalizationStatusesProto clProto : protoList) { + List lsProtos = + clProto.getLocalizationStatusesList(); + + List statusesPerCntn = new ArrayList<>(); + lsProtos.forEach(lsProto -> { + statusesPerCntn.add(convertFromProtoFormat(lsProto)); + }); + + localizationStatuses.put(convertFromProtoFormat(clProto.getContainerId()), + statusesPerCntn); + } + } + + private void initFailedRequests() { + if (this.failedRequests != null) { + return; + } + GetLocalizationStatusesResponseProtoOrBuilder p = viaProto ? proto : + builder; + List protoList = p.getFailedRequestsList(); + this.failedRequests = new HashMap<>(); + for (ContainerExceptionMapProto ce : protoList) { + this.failedRequests.put(convertFromProtoFormat(ce.getContainerId()), + convertFromProtoFormat(ce.getException())); + } + } + + @Override + public Map> getLocalizationStatuses() { + initLocalContainerStatuses(); + return this.localizationStatuses; + } + + @Override + public void setLocalizationStatuses( + Map> statuses) { + maybeInitBuilder(); + if (statuses == null) { + builder.clearCntnLocalizationStatuses(); + } + this.localizationStatuses = statuses; + } + + @Override + public Map getFailedRequests() { + initFailedRequests(); + return this.failedRequests; + } + + @Override + public void setFailedRequests( + Map failedRequests) { + maybeInitBuilder(); + if (failedRequests == null) { + builder.clearFailedRequests(); + } + this.failedRequests = failedRequests; + } + + private LocalizationStatusPBImpl convertFromProtoFormat( + LocalizationStatusProto p) { + return new LocalizationStatusPBImpl(p); + } + + private LocalizationStatusProto convertToProtoFormat( + LocalizationStatus t) { + return ((LocalizationStatusPBImpl) t).getProto(); + } + + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private SerializedExceptionPBImpl convertFromProtoFormat( + SerializedExceptionProto p) { + return new SerializedExceptionPBImpl(p); + } + + private SerializedExceptionProto convertToProtoFormat(SerializedException t) { + return ((SerializedExceptionPBImpl) t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java new file mode 100644 index 0000000000..3e7a9fe685 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalizationStatusPBImpl.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStateProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStatusProtoOrBuilder; + +/** + * PB Impl of {@link LocalizationStatus}. + */ +@Private +@Unstable +public class LocalizationStatusPBImpl extends LocalizationStatus { + private LocalizationStatusProto proto = + LocalizationStatusProto.getDefaultInstance(); + private LocalizationStatusProto.Builder builder; + private boolean viaProto = false; + + private String resourceKey; + private LocalizationState localizationState; + private String diagnostics; + + public LocalizationStatusPBImpl() { + builder = LocalizationStatusProto.newBuilder(); + } + + public LocalizationStatusPBImpl(LocalizationStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized LocalizationStatusProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("LocalizationStatus: ["); + sb.append("ResourceKey: ").append(getResourceKey()).append(", "); + sb.append("LocalizationState: ").append(getLocalizationState()) + .append(", "); + sb.append("Diagnostics: ").append(getDiagnostics()).append(", "); + sb.append("]"); + return sb.toString(); + } + + private void mergeLocalToBuilder() { + if (resourceKey != null) { + builder.setResourceKey(this.resourceKey); + } + if (localizationState != null) { + builder.setLocalizationState(convertToProtoFormat(localizationState)); + } + if (diagnostics != null) { + builder.setDiagnostics(diagnostics); + } + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LocalizationStatusProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized String getResourceKey() { + LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.resourceKey != null) { + return this.resourceKey; + } + if (!p.hasResourceKey()) { + return null; + } + this.resourceKey = p.getResourceKey(); + return this.resourceKey; + } + + @Override + public synchronized void setResourceKey(String resourceKey) { + maybeInitBuilder(); + if (resourceKey == null) { + builder.clearResourceKey(); + } + this.resourceKey = resourceKey; + } + + @Override + public synchronized LocalizationState getLocalizationState() { + LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.localizationState != null) { + return this.localizationState; + } + if (!p.hasLocalizationState()) { + return null; + } + this.localizationState = convertFromProtoFormat(p.getLocalizationState()); + return localizationState; + } + + @Override + public synchronized void setLocalizationState( + LocalizationState localizationState) { + maybeInitBuilder(); + if (localizationState == null) { + builder.clearLocalizationState(); + } + this.localizationState = localizationState; + } + + @Override + public synchronized String getDiagnostics() { + LocalizationStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.diagnostics != null) { + return this.diagnostics; + } + if (!p.hasDiagnostics()) { + return null; + } + this.diagnostics = p.getDiagnostics(); + return diagnostics; + } + + @Override + public synchronized void setDiagnostics(String diagnostics) { + maybeInitBuilder(); + if (diagnostics == null) { + builder.clearDiagnostics(); + } + this.diagnostics = diagnostics; + } + + private LocalizationStateProto convertToProtoFormat(LocalizationState e) { + return ProtoUtils.convertToProtoFormat(e); + } + + private LocalizationState convertFromProtoFormat(LocalizationStateProto e) { + return ProtoUtils.convertFromProtoFormat(e); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index f175cf3e6c..3f360d764c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LocalizationState; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -92,12 +93,16 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.LocalizationStateProto; import org.apache.hadoop.yarn.server.api.ContainerType; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.protobuf.ByteString; +/** + * Utils to convert enum protos to corresponding java enums and vice versa. + */ @Private @Unstable public class ProtoUtils { @@ -596,6 +601,21 @@ public static ApplicationIdPBImpl convertFromProtoFormat( public static ApplicationIdProto convertToProtoFormat(ApplicationId t) { return ((ApplicationIdPBImpl) t).getProto(); } + + //Localization State + private final static String LOCALIZATION_STATE_PREFIX = "L_"; + public static LocalizationStateProto convertToProtoFormat( + LocalizationState e) { + return LocalizationStateProto.valueOf(LOCALIZATION_STATE_PREFIX + e.name()); + } + + public static LocalizationState convertFromProtoFormat( + LocalizationStateProto e) { + return LocalizationState.valueOf(e.name() + .replace(LOCALIZATION_STATE_PREFIX, "")); + } + } + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index dfe7534974..34e2198480 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -245,5 +247,12 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java index 6e9728475e..1690b815d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -227,5 +229,12 @@ public CommitResponse commitLastReInitialization(ContainerId containerId) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) + throws YarnException, IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 82dfaea32a..dedabc07d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -420,6 +422,13 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 256ae876b1..2ca63ae7e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; @@ -1963,4 +1966,54 @@ public void handleCredentialUpdate() { dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent()); } } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + Map> allStatuses = new HashMap<>(); + Map failedRequests = new HashMap<>(); + + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); + if (identifier == null) { + throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); + } + String remoteUser = remoteUgi.getUserName(); + for (ContainerId id : request.getContainerIds()) { + try { + List statuses = getLocalizationStatusesInternal(id, + identifier, remoteUser); + allStatuses.put(id, statuses); + } catch (YarnException e) { + failedRequests.put(id, SerializedException.newInstance(e)); + } + } + return GetLocalizationStatusesResponse.newInstance(allStatuses, + failedRequests); + } + + private List getLocalizationStatusesInternal( + ContainerId containerID, + NMTokenIdentifier nmTokenIdentifier, String remoteUser) + throws YarnException { + Container container = this.context.getContainers().get(containerID); + + LOG.info("Getting localization status for {}", containerID); + authorizeGetAndStopContainerRequest(containerID, container, false, + nmTokenIdentifier, remoteUser); + + String containerIDStr = containerID.toString(); + if (container == null) { + if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " was recently stopped on node manager."); + } else { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is not handled by this NodeManager"); + } + } + return container.getLocalizationStatuses(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 05658854b4..5a457c9015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; @@ -125,4 +126,10 @@ public interface Container extends EventHandler { * @return true/false based on container's state */ boolean isContainerInFinalStates(); + + /** + * Get the localization statuses. + * @return localization statuses. + */ + List getLocalizationStatuses(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 1d6ba2e110..cfa62577ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1469,7 +1470,8 @@ public void transition(ContainerImpl container, ContainerEvent event) { ContainerResourceFailedEvent failedEvent = (ContainerResourceFailedEvent) event; container.resourceSet - .resourceLocalizationFailed(failedEvent.getResource()); + .resourceLocalizationFailed(failedEvent.getResource(), + failedEvent.getDiagnosticMessage()); container.addDiagnostics(failedEvent.getDiagnosticMessage()); } } @@ -1485,7 +1487,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { ContainerResourceFailedEvent failedEvent = (ContainerResourceFailedEvent) event; container.resourceSet.resourceLocalizationFailed( - failedEvent.getResource()); + failedEvent.getResource(), failedEvent.getDiagnosticMessage()); container.addDiagnostics("Container aborting re-initialization.. " + failedEvent.getDiagnosticMessage()); LOG.error("Container [" + container.getContainerId() + "] Re-init" + @@ -2288,4 +2290,14 @@ public boolean isContainerInFinalStates() { public void setExposedPorts(String ports) { this.exposedPorts = ports; } + + @Override + public List getLocalizationStatuses() { + this.readLock.lock(); + try { + return resourceSet.getLocalizationStatuses(); + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java index 745f8a88fb..95a8031eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -21,6 +21,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +50,8 @@ public class ResourceSet { new ConcurrentHashMap<>(); private Map> pendingResources = new ConcurrentHashMap<>(); - private Set resourcesFailedToBeLocalized = - new HashSet<>(); + private final List resourcesFailedToBeLocalized = + new ArrayList<>(); // resources by visibility (public, private, app) private final List publicRsrcs = @@ -135,13 +137,20 @@ public Set resourceLocalized(LocalResourceRequest request, } } - public void resourceLocalizationFailed(LocalResourceRequest request) { + public void resourceLocalizationFailed(LocalResourceRequest request, + String diagnostics) { // Skip null request when localization failed for running container if (request == null) { return; } - pendingResources.remove(request); - resourcesFailedToBeLocalized.add(request); + Set keys = pendingResources.remove(request); + if (keys != null) { + synchronized (resourcesFailedToBeLocalized) { + keys.forEach(key -> + resourcesFailedToBeLocalized.add(LocalizationStatus.newInstance(key, + LocalizationState.FAILED, diagnostics))); + } + } } public synchronized Map getLocalizationStatuses() { + List statuses = new ArrayList<>(); + localizedResources.forEach((key, path) -> { + LocalizationStatus status = LocalizationStatus.newInstance(key, + LocalizationState.COMPLETED); + statuses.add(status); + }); + + pendingResources.forEach((lrReq, keys) -> + keys.forEach(key -> { + LocalizationStatus status = LocalizationStatus.newInstance(key, + LocalizationState.PENDING); + statuses.add(status); + })); + + synchronized (resourcesFailedToBeLocalized) { + statuses.addAll(resourcesFailedToBeLocalized); + } + return statuses; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index f78bb6ec8c..e215980882 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.junit.Assert.assertEquals; @@ -2179,4 +2184,127 @@ public void testStartContainerFailureWithNullVisibilityLocalResource() Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() .contains("Null resource visibility for local resource")); } + + @Test + public void testGetLocalizationStatuses() throws Exception { + containerManager.start(); + ContainerId containerId = createContainerId(0, 0); + Token containerToken = + createContainerToken(containerId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, context.getContainerTokenSecretManager()); + + // localization resource + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile(); + + writeScriptFile(fileWriter, "Upgrade World!", file1, containerId, false); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0); + + StartContainerRequest request = StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + List startRequest = new ArrayList<>(); + startRequest.add(request); + + // start container + StartContainersRequest requestList = StartContainersRequest.newInstance( + startRequest); + containerManager.startContainers(requestList); + Thread.sleep(5000); + + // Get localization statuses + GetLocalizationStatusesRequest statusRequest = + GetLocalizationStatusesRequest.newInstance( + Lists.newArrayList(containerId)); + + GetLocalizationStatusesResponse statusResponse = + containerManager.getLocalizationStatuses(statusRequest); + + Assert.assertEquals(1, statusResponse.getLocalizationStatuses() + .get(containerId).size()); + LocalizationStatus status = statusResponse.getLocalizationStatuses() + .get(containerId).iterator().next(); + Assert.assertEquals("resource key", "dest_file1", + status.getResourceKey()); + Assert.assertEquals("resource status", LocalizationState.COMPLETED, + status.getLocalizationState()); + + Assert.assertEquals(0, statusResponse.getFailedRequests().size()); + + // stop containers + StopContainersRequest stopRequest = + StopContainersRequest.newInstance(Lists.newArrayList(containerId)); + containerManager.stopContainers(stopRequest); + } + + @Test + public void testGetLocalizationStatusesMultiContainers() throws Exception { + containerManager.start(); + ContainerId container1 = createContainerId(0, 0); + ContainerId container2 = createContainerId(1, 0); + + Token containerToken1 = createContainerToken(container1, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager()); + Token containerToken2 = createContainerToken(container2, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager()); + + // localization resource + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile(); + + writeScriptFile(fileWriter, "Upgrade World!", file1, container1, false); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0); + + StartContainerRequest request1 = StartContainerRequest.newInstance( + containerLaunchContext, containerToken1); + StartContainerRequest request2 = StartContainerRequest.newInstance( + containerLaunchContext, containerToken2); + + List startRequest = new ArrayList<>(); + startRequest.add(request1); + startRequest.add(request2); + + // start container + StartContainersRequest requestList = StartContainersRequest.newInstance( + startRequest); + containerManager.startContainers(requestList); + Thread.sleep(5000); + + // Get localization statuses + GetLocalizationStatusesRequest statusRequest = + GetLocalizationStatusesRequest.newInstance( + Lists.newArrayList(container1, container2)); + + GetLocalizationStatusesResponse statusResponse = + containerManager.getLocalizationStatuses(statusRequest); + Assert.assertEquals(2, statusResponse.getLocalizationStatuses().size()); + + ContainerId[] containerIds = {container1, container2}; + Arrays.stream(containerIds).forEach(cntnId -> { + List statuses = statusResponse + .getLocalizationStatuses().get(container1); + Assert.assertEquals(1, statuses.size()); + LocalizationStatus status = statuses.get(0); + Assert.assertEquals("resource key", "dest_file1", + status.getResourceKey()); + Assert.assertEquals("resource status", LocalizationState.COMPLETED, + status.getLocalizationState()); + }); + + Assert.assertEquals(0, statusResponse.getFailedRequests().size()); + + // stop containers + StopContainersRequest stopRequest = + StopContainersRequest.newInstance(Lists.newArrayList(container1, + container2)); + containerManager.stopContainers(stopRequest); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java new file mode 100644 index 0000000000..12d8c84313 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceSet.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LocalizationState; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; +import org.apache.hadoop.yarn.api.records.URL; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests of {@link ResourceSet}. + */ +public class TestResourceSet { + + @Test + public void testGetPendingLS() throws URISyntaxException { + ResourceSet resourceSet = new ResourceSet(); + Map resources = new HashMap<>(); + resources.put("resource1", + LocalResource.newInstance(URL.fromPath(new Path("/tmp/file1.txt")), + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, + 0, System.currentTimeMillis())); + resourceSet.addResources(resources); + + Assert.assertEquals("num statuses", 1, + resourceSet.getLocalizationStatuses().size()); + LocalizationStatus status = resourceSet.getLocalizationStatuses() + .iterator().next(); + Assert.assertEquals("status", LocalizationState.PENDING, + status.getLocalizationState()); + } + + @Test + public void testGetCompletedLS() throws URISyntaxException { + ResourceSet resourceSet = new ResourceSet(); + Map resources = new HashMap<>(); + LocalResource resource1 = LocalResource.newInstance( + URL.fromPath(new Path("/tmp/file1.txt")), + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, + 0, System.currentTimeMillis()); + + resources.put("resource1", resource1); + resourceSet.addResources(resources); + + LocalResourceRequest lrr = new LocalResourceRequest(resource1); + resourceSet.resourceLocalized(lrr, new Path("file1.txt")); + + Assert.assertEquals("num statuses", 1, + resourceSet.getLocalizationStatuses().size()); + LocalizationStatus status = resourceSet.getLocalizationStatuses() + .iterator().next(); + Assert.assertEquals("status", LocalizationState.COMPLETED, + status.getLocalizationState()); + } + + + @Test + public void testGetFailedLS() throws URISyntaxException { + ResourceSet resourceSet = new ResourceSet(); + Map resources = new HashMap<>(); + LocalResource resource1 = LocalResource.newInstance( + URL.fromPath(new Path("/tmp/file1.txt")), + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, + 0, System.currentTimeMillis()); + + resources.put("resource1", resource1); + resourceSet.addResources(resources); + + LocalResourceRequest lrr = new LocalResourceRequest(resource1); + resourceSet.resourceLocalizationFailed(lrr, "file does not exist"); + + Assert.assertEquals("num statuses", 1, + resourceSet.getLocalizationStatuses().size()); + LocalizationStatus status = resourceSet.getLocalizationStatuses() + .iterator().next(); + Assert.assertEquals("status", LocalizationState.FAILED, + status.getLocalizationState()); + Assert.assertEquals("diagnostics", "file does not exist", + status.getDiagnostics()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index f1b39bdb96..980f29b57c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalizationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; @@ -272,4 +273,9 @@ public void sendPauseEvent(String description) { @Override public boolean isContainerInFinalStates() { return false; } + + @Override + public List getLocalizationStatuses() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index ee974e3389..440d971f20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest; @@ -370,4 +372,11 @@ public CommitResponse commitLastReInitialization(ContainerId containerId) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index 1acf658b22..50e5865d3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -216,6 +218,13 @@ public CommitResponse commitLastReInitialization(ContainerId containerId) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 03ccd764b6..8fb1aa855d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -212,6 +214,13 @@ public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException { return null; } + + @Override + public GetLocalizationStatusesResponse getLocalizationStatuses( + GetLocalizationStatusesRequest request) throws YarnException, + IOException { + return null; + } } @Test