YARN-10841. Fix token reset synchronization for UAM response token. (#3194)

YARN-10841. Fix token reset synchronization for UAM response token.  Contributed by Minni Mittal
This commit is contained in:
minni31 2021-07-29 14:55:39 +05:30 committed by GitHub
parent 6f730fd25c
commit d78b300ed4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1413,8 +1413,8 @@ protected void mergeAllocateResponse(AllocateResponse homeResponse,
if (otherRMAddress.equals(this.homeSubClusterId)) { if (otherRMAddress.equals(this.homeSubClusterId)) {
homeResponse.setAMRMToken(otherResponse.getAMRMToken()); homeResponse.setAMRMToken(otherResponse.getAMRMToken());
} else { } else {
throw new YarnRuntimeException( LOG.warn("amrmToken from UAM {} not null, it should be null here",
"amrmToken from UAM " + otherRMAddress + " should be null here"); otherRMAddress);
} }
} }
@ -1691,6 +1691,8 @@ private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
@Override @Override
public void callback(AllocateResponse response) { public void callback(AllocateResponse response) {
org.apache.hadoop.yarn.api.records.Token amrmToken =
response.getAMRMToken();
synchronized (asyncResponseSink) { synchronized (asyncResponseSink) {
List<AllocateResponse> responses = null; List<AllocateResponse> responses = null;
if (asyncResponseSink.containsKey(subClusterId)) { if (asyncResponseSink.containsKey(subClusterId)) {
@ -1700,6 +1702,11 @@ public void callback(AllocateResponse response) {
asyncResponseSink.put(subClusterId, responses); asyncResponseSink.put(subClusterId, responses);
} }
responses.add(response); responses.add(response);
if (this.isUAM) {
// Do not further propagate the new amrmToken for UAM
response.setAMRMToken(null);
}
// Notify main thread about the response arrival // Notify main thread about the response arrival
asyncResponseSink.notifyAll(); asyncResponseSink.notifyAll();
} }
@ -1716,9 +1723,9 @@ public void callback(AllocateResponse response) {
// Save the new AMRMToken for the UAM if present // Save the new AMRMToken for the UAM if present
// Do this last because it can be slow... // Do this last because it can be slow...
if (this.isUAM && response.getAMRMToken() != null) { if (this.isUAM && amrmToken != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null); .convertFromYarn(amrmToken, (Text) null);
// Do not further propagate the new amrmToken for UAM // Do not further propagate the new amrmToken for UAM
response.setAMRMToken(null); response.setAMRMToken(null);