YARN-11561. [Federation] GPG Supports Format PolicyStateStore. (#6300) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2023-12-03 18:36:20 +08:00 committed by GitHub
parent 2323ad24a2
commit 8745857c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 338 additions and 2 deletions

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
/**
* The FederationPolicyStore provides a key-value interface to access the
@ -74,4 +76,13 @@ SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException;
/**
* Delete all queue-to-policy configurations.
*
* @param request delete request.
* @return If the response is empty, the queue-to-policy configurations are deleted successfully.
* @throws Exception if the request is invalid/fails
*/
DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception;
}

View File

@ -87,6 +87,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
@ -400,6 +402,13 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
}
@Override
public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception {
policies.clear();
return DeletePoliciesConfigurationsResponse.newInstance();
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;

View File

@ -89,6 +89,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
@ -1071,6 +1073,29 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations);
}
@Override
public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception {
Connection connection = null;
try {
connection = getConnection(false);
FederationQueryRunner runner = new FederationQueryRunner();
LOG.info("delete table = policies start.");
runner.truncateTable(connection, "policies");
LOG.info("delete table = policies finished.");
} catch (Exception e) {
throw new RuntimeException("Could not delete table (policies)!", e);
} finally {
// Return to the pool the CallableStatement
try {
FederationStateStoreUtils.returnToPool(LOG, null, connection);
} catch (YarnException e) {
LOG.error("close connection error.", e);
}
}
return DeletePoliciesConfigurationsResponse.newInstance();
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;

View File

@ -91,6 +91,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
@ -786,6 +788,23 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
}
@Override
public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception {
zkManager.delete(policiesZNode);
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return DeletePoliciesConfigurationsResponse.newInstance();
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* This class is used for handling queue policy deletion requests.
* We will delete all PoliciesConfigurations.
*/
public abstract class DeletePoliciesConfigurationsRequest {
@Private
@Unstable
public static DeletePoliciesConfigurationsRequest newInstance() {
return Records.newRecord(DeletePoliciesConfigurationsRequest.class);
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* This class is used to respond to requests to delete PoliciesConfigurations.
*/
public abstract class DeletePoliciesConfigurationsResponse {
@Private
@Unstable
public static DeletePoliciesConfigurationsResponse newInstance() {
return Records.newRecord(DeletePoliciesConfigurationsResponse.class);
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeletePoliciesConfigurationsRequestProto;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
@Private
@Unstable
public class DeletePoliciesConfigurationsRequestPBImpl
extends DeletePoliciesConfigurationsRequest {
private DeletePoliciesConfigurationsRequestProto proto =
DeletePoliciesConfigurationsRequestProto.getDefaultInstance();
private DeletePoliciesConfigurationsRequestProto.Builder builder = null;
private boolean viaProto = false;
public DeletePoliciesConfigurationsRequestPBImpl() {
builder = DeletePoliciesConfigurationsRequestProto.newBuilder();
}
public DeletePoliciesConfigurationsRequestPBImpl(
DeletePoliciesConfigurationsRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public DeletePoliciesConfigurationsRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeletePoliciesConfigurationsResponseProto;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
@Private
@Unstable
public class DeletePoliciesConfigurationsResponsePBImpl
extends DeletePoliciesConfigurationsResponse {
private DeletePoliciesConfigurationsResponseProto proto =
DeletePoliciesConfigurationsResponseProto.getDefaultInstance();
private DeletePoliciesConfigurationsResponseProto.Builder builder = null;
private boolean viaProto = false;
public DeletePoliciesConfigurationsResponsePBImpl() {
builder = DeletePoliciesConfigurationsResponseProto.newBuilder();
}
public DeletePoliciesConfigurationsResponsePBImpl(
DeletePoliciesConfigurationsResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public DeletePoliciesConfigurationsResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -86,6 +86,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
@ -1113,6 +1114,11 @@ public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationI
}
}
public void deleteAllPoliciesConfigurations() throws Exception {
DeletePoliciesConfigurationsRequest request =
DeletePoliciesConfigurationsRequest.newInstance();
stateStore.deleteAllPoliciesConfigurations(request);
}
@VisibleForTesting
public FederationCache getFederationCache() {

View File

@ -196,6 +196,11 @@ message DeleteReservationHomeSubClusterRequestProto {
message DeleteReservationHomeSubClusterResponseProto {
}
message DeletePoliciesConfigurationsRequestProto {
}
message DeletePoliciesConfigurationsResponseProto {
}
//----- configurations ---

View File

@ -75,6 +75,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
@ -122,6 +124,7 @@ public void before() throws IOException, YarnException {
@After
public void after() throws Exception {
testDeleteStateStore();
testDeletePolicyStore();
stateStore.close();
}
@ -1136,4 +1139,25 @@ public void testDeleteStateStore() throws Exception {
assertNotNull(appsHomeSubClusters);
assertEquals(0, appsHomeSubClusters.size());
}
public void testDeletePolicyStore() throws Exception {
// Step1. We delete all Policies Configurations.
FederationStateStore federationStateStore = this.getStateStore();
DeletePoliciesConfigurationsRequest request =
DeletePoliciesConfigurationsRequest.newInstance();
DeletePoliciesConfigurationsResponse response =
federationStateStore.deleteAllPoliciesConfigurations(request);
assertNotNull(response);
// Step2. We check the Policies size, the size should be 0 at this time.
GetSubClusterPoliciesConfigurationsRequest request1 =
GetSubClusterPoliciesConfigurationsRequest.newInstance();
GetSubClusterPoliciesConfigurationsResponse response1 =
stateStore.getPoliciesConfigurations(request1);
assertNotNull(response1);
List<SubClusterPolicyConfiguration> policiesConfigs =
response1.getPoliciesConfigs();
assertNotNull(policiesConfigs);
assertEquals(0, policiesConfigs.size());
}
}

View File

@ -340,8 +340,7 @@ public static void main(String[] argv) {
argv = hParser.getRemainingArgs();
if (argv.length > 1) {
if (argv[0].equals("-format-policy-store")) {
// TODO: YARN-11561. [Federation] GPG Supports Format PolicyStateStore.
System.err.println("format-policy-store is not yet supported.");
handFormatPolicyStateStore(conf);
} else {
printUsage(System.err);
}
@ -366,4 +365,16 @@ public WebApp getWebApp() {
private static void printUsage(PrintStream out) {
out.println("Usage: yarn gpg [-format-policy-store]");
}
private static void handFormatPolicyStateStore(Configuration conf) {
try {
System.out.println("Deleting Federation policy state store.");
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
System.out.println("Federation policy state store has been cleaned.");
facade.deleteAllPoliciesConfigurations();
} catch (Exception e) {
LOG.error("Delete Federation policy state store error.", e);
System.err.println("Delete Federation policy state store error, exception = " + e);
}
}
}

View File

@ -45,6 +45,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeletePoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
@ -322,6 +324,16 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
return clientMethod.invoke();
}
@Override
public DeletePoliciesConfigurationsResponse deleteAllPoliciesConfigurations(
DeletePoliciesConfigurationsRequest request) throws Exception {
FederationClientMethod<DeletePoliciesConfigurationsResponse> clientMethod =
new FederationClientMethod<>("deleteAllPoliciesConfigurations",
DeletePoliciesConfigurationsRequest.class, request,
DeletePoliciesConfigurationsResponse.class, stateStoreClient, clock);
return clientMethod.invoke();
}
@Override
public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
throws YarnException {