HDDS-1973. Implement OM RenewDelegationToken request to use Cache and DoubleBuffer. (#1316)
This commit is contained in:
parent
93595febaa
commit
217e74816c
@ -251,14 +251,30 @@ public synchronized long renewToken(Token<OzoneTokenIdentifier> token,
|
|||||||
}
|
}
|
||||||
|
|
||||||
long renewTime = Math.min(id.getMaxDate(), now + getTokenRenewInterval());
|
long renewTime = Math.min(id.getMaxDate(), now + getTokenRenewInterval());
|
||||||
try {
|
|
||||||
addToTokenStore(id, token.getPassword(), renewTime);
|
// For HA ratis will take care of updating.
|
||||||
} catch (IOException e) {
|
// This will be removed, when HA/Non-HA code is merged.
|
||||||
LOG.error("Unable to update token " + id.getSequenceNumber(), e);
|
if (!isRatisEnabled) {
|
||||||
|
try {
|
||||||
|
addToTokenStore(id, token.getPassword(), renewTime);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to update token " + id.getSequenceNumber(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return renewTime;
|
return renewTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateRenewToken(Token<OzoneTokenIdentifier> token,
|
||||||
|
OzoneTokenIdentifier ozoneTokenIdentifier, long expiryTime) {
|
||||||
|
//TODO: Instead of having in-memory map inside this class, we can use
|
||||||
|
// cache from table and make this table cache clean up policy NEVER. In
|
||||||
|
// this way, we don't need to maintain seperate in-memory map. To do this
|
||||||
|
// work we need to merge HA/Non-HA code.
|
||||||
|
TokenInfo tokenInfo = new TokenInfo(expiryTime, token.getPassword(),
|
||||||
|
ozoneTokenIdentifier.getTrackingId());
|
||||||
|
currentTokens.put(ozoneTokenIdentifier, tokenInfo);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cancel a token by removing it from store and cache.
|
* Cancel a token by removing it from store and cache.
|
||||||
*
|
*
|
||||||
|
@ -143,6 +143,7 @@ message OMRequest {
|
|||||||
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
|
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
|
||||||
optional hadoop.common.CancelDelegationTokenRequestProto cancelDelegationTokenRequest = 63;
|
optional hadoop.common.CancelDelegationTokenRequestProto cancelDelegationTokenRequest = 63;
|
||||||
optional UpdateGetDelegationTokenRequest updateGetDelegationTokenRequest = 64;
|
optional UpdateGetDelegationTokenRequest updateGetDelegationTokenRequest = 64;
|
||||||
|
optional UpdateRenewDelegationTokenRequest updatedRenewDelegationTokenRequest = 65;
|
||||||
|
|
||||||
optional GetFileStatusRequest getFileStatusRequest = 70;
|
optional GetFileStatusRequest getFileStatusRequest = 70;
|
||||||
optional CreateDirectoryRequest createDirectoryRequest = 71;
|
optional CreateDirectoryRequest createDirectoryRequest = 71;
|
||||||
@ -317,6 +318,17 @@ message UpdateGetDelegationTokenRequest {
|
|||||||
required GetDelegationTokenResponseProto getDelegationTokenResponse = 1;
|
required GetDelegationTokenResponseProto getDelegationTokenResponse = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
This will be used during OM HA, once leader renews token, sends this
|
||||||
|
request via ratis to persist to OM DB. This request will be internally used
|
||||||
|
by OM for replicating renewed token information across a quorum of OMs.
|
||||||
|
*/
|
||||||
|
message UpdateRenewDelegationTokenRequest {
|
||||||
|
required hadoop.common.RenewDelegationTokenRequestProto
|
||||||
|
renewDelegationTokenRequest = 1;
|
||||||
|
required RenewDelegationTokenResponseProto renewDelegationTokenResponse = 2;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Creates a volume
|
Creates a volume
|
||||||
*/
|
*/
|
||||||
|
@ -46,7 +46,9 @@
|
|||||||
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
|
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
|
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
|
import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
|
||||||
|
import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
|
import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.ozone.om.request.security.OMRenewDelegationTokenRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
|
import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
|
import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
|
||||||
import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
|
import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
|
||||||
@ -136,6 +138,10 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) {
|
|||||||
return getOMAclRequest(omRequest);
|
return getOMAclRequest(omRequest);
|
||||||
case GetDelegationToken:
|
case GetDelegationToken:
|
||||||
return new OMGetDelegationTokenRequest(omRequest);
|
return new OMGetDelegationTokenRequest(omRequest);
|
||||||
|
case CancelDelegationToken:
|
||||||
|
return new OMCancelDelegationTokenRequest(omRequest);
|
||||||
|
case RenewDelegationToken:
|
||||||
|
return new OMRenewDelegationTokenRequest(omRequest);
|
||||||
default:
|
default:
|
||||||
// TODO: will update once all request types are implemented.
|
// TODO: will update once all request types are implemented.
|
||||||
return null;
|
return null;
|
||||||
|
@ -0,0 +1,164 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.om.request.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||||
|
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||||
|
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
|
||||||
|
import org.apache.hadoop.ozone.om.request.OMClientRequest;
|
||||||
|
import org.apache.hadoop.ozone.om.response.OMClientResponse;
|
||||||
|
import org.apache.hadoop.ozone.om.response.security.OMRenewDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpdateRenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
|
||||||
|
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheKey;
|
||||||
|
import org.apache.hadoop.utils.db.cache.CacheValue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle RenewDelegationToken Request.
|
||||||
|
*/
|
||||||
|
public class OMRenewDelegationTokenRequest extends OMClientRequest {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(OMRenewDelegationTokenRequest.class);
|
||||||
|
|
||||||
|
public OMRenewDelegationTokenRequest(OMRequest omRequest) {
|
||||||
|
super(omRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
|
||||||
|
RenewDelegationTokenRequestProto renewDelegationTokenRequest =
|
||||||
|
getOmRequest().getRenewDelegationTokenRequest();
|
||||||
|
|
||||||
|
// Call OM to renew token
|
||||||
|
long renewTime = ozoneManager.renewDelegationToken(
|
||||||
|
OMPBHelper.convertToDelegationToken(
|
||||||
|
renewDelegationTokenRequest.getToken()));
|
||||||
|
|
||||||
|
RenewDelegationTokenResponseProto.Builder renewResponse =
|
||||||
|
RenewDelegationTokenResponseProto.newBuilder();
|
||||||
|
|
||||||
|
renewResponse.setResponse(org.apache.hadoop.security.proto.SecurityProtos
|
||||||
|
.RenewDelegationTokenResponseProto.newBuilder()
|
||||||
|
.setNewExpiryTime(renewTime));
|
||||||
|
|
||||||
|
|
||||||
|
// Client issues RenewDelegationToken request, when received by OM leader
|
||||||
|
// it will renew the token. Original RenewDelegationToken request is
|
||||||
|
// converted to UpdateRenewDelegationToken request with the token and renew
|
||||||
|
// information. This updated request will be submitted to Ratis. In this
|
||||||
|
// way delegation token renewd by leader, will be replicated across all
|
||||||
|
// OMs. With this approach, original RenewDelegationToken request from
|
||||||
|
// client does not need any proto changes.
|
||||||
|
|
||||||
|
// Create UpdateRenewDelegationTokenRequest with original request and
|
||||||
|
// expiry time.
|
||||||
|
OMRequest.Builder omRequest = OMRequest.newBuilder()
|
||||||
|
.setUserInfo(getUserInfo())
|
||||||
|
.setUpdatedRenewDelegationTokenRequest(
|
||||||
|
UpdateRenewDelegationTokenRequest.newBuilder()
|
||||||
|
.setRenewDelegationTokenRequest(renewDelegationTokenRequest)
|
||||||
|
.setRenewDelegationTokenResponse(renewResponse))
|
||||||
|
.setCmdType(getOmRequest().getCmdType())
|
||||||
|
.setClientId(getOmRequest().getClientId());
|
||||||
|
|
||||||
|
if (getOmRequest().hasTraceID()) {
|
||||||
|
omRequest.setTraceID(getOmRequest().getTraceID());
|
||||||
|
}
|
||||||
|
|
||||||
|
return omRequest.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
|
||||||
|
long transactionLogIndex,
|
||||||
|
OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
|
||||||
|
|
||||||
|
UpdateRenewDelegationTokenRequest updateRenewDelegationTokenRequest =
|
||||||
|
getOmRequest().getUpdatedRenewDelegationTokenRequest();
|
||||||
|
|
||||||
|
Token<OzoneTokenIdentifier> ozoneTokenIdentifierToken =
|
||||||
|
OMPBHelper.convertToDelegationToken(updateRenewDelegationTokenRequest
|
||||||
|
.getRenewDelegationTokenRequest().getToken());
|
||||||
|
|
||||||
|
long renewTime = updateRenewDelegationTokenRequest
|
||||||
|
.getRenewDelegationTokenResponse().getResponse().getNewExpiryTime();
|
||||||
|
|
||||||
|
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
|
||||||
|
|
||||||
|
OMClientResponse omClientResponse = null;
|
||||||
|
OMResponse.Builder omResponse =
|
||||||
|
OMResponse.newBuilder()
|
||||||
|
.setCmdType(OzoneManagerProtocolProtos.Type.RenewDelegationToken)
|
||||||
|
.setStatus(OzoneManagerProtocolProtos.Status.OK)
|
||||||
|
.setSuccess(true);
|
||||||
|
try {
|
||||||
|
|
||||||
|
OzoneTokenIdentifier ozoneTokenIdentifier =
|
||||||
|
ozoneTokenIdentifierToken.decodeIdentifier();
|
||||||
|
|
||||||
|
// Update in memory map of token.
|
||||||
|
ozoneManager.getDelegationTokenMgr()
|
||||||
|
.updateRenewToken(ozoneTokenIdentifierToken, ozoneTokenIdentifier,
|
||||||
|
renewTime);
|
||||||
|
|
||||||
|
// Update Cache.
|
||||||
|
omMetadataManager.getDelegationTokenTable().addCacheEntry(
|
||||||
|
new CacheKey<>(ozoneTokenIdentifier),
|
||||||
|
new CacheValue<>(Optional.of(renewTime), transactionLogIndex));
|
||||||
|
|
||||||
|
omClientResponse =
|
||||||
|
new OMRenewDelegationTokenResponse(ozoneTokenIdentifier, renewTime,
|
||||||
|
omResponse.setRenewDelegationTokenResponse(
|
||||||
|
updateRenewDelegationTokenRequest
|
||||||
|
.getRenewDelegationTokenResponse()).build());
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Error in Updating Renew DelegationToken {}",
|
||||||
|
ozoneTokenIdentifierToken, ex);
|
||||||
|
omClientResponse = new OMRenewDelegationTokenResponse(null, -1L,
|
||||||
|
createErrorOMResponse(omResponse, ex));
|
||||||
|
} finally {
|
||||||
|
if (omClientResponse != null) {
|
||||||
|
omClientResponse.setFlushFuture(
|
||||||
|
ozoneManagerDoubleBufferHelper.add(omClientResponse,
|
||||||
|
transactionLogIndex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Updated renew delegation token in-memory map: {} with expiry" +
|
||||||
|
" time {}", ozoneTokenIdentifierToken, renewTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
return omClientResponse;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,58 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.om.response.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||||
|
import org.apache.hadoop.ozone.om.response.OMClientResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
|
||||||
|
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||||
|
import org.apache.hadoop.utils.db.BatchOperation;
|
||||||
|
import org.apache.hadoop.utils.db.Table;
|
||||||
|
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle response for RenewDelegationToken request.
|
||||||
|
*/
|
||||||
|
public class OMRenewDelegationTokenResponse extends OMClientResponse {
|
||||||
|
|
||||||
|
private OzoneTokenIdentifier ozoneTokenIdentifier;
|
||||||
|
private long renewTime = -1L;
|
||||||
|
|
||||||
|
public OMRenewDelegationTokenResponse(
|
||||||
|
@Nullable OzoneTokenIdentifier ozoneTokenIdentifier,
|
||||||
|
long renewTime, @Nonnull OMResponse omResponse) {
|
||||||
|
super(omResponse);
|
||||||
|
this.ozoneTokenIdentifier = ozoneTokenIdentifier;
|
||||||
|
this.renewTime = renewTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToDBBatch(OMMetadataManager omMetadataManager,
|
||||||
|
BatchOperation batchOperation) throws IOException {
|
||||||
|
Table table = omMetadataManager.getDelegationTokenTable();
|
||||||
|
if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
|
||||||
|
table.putWithBatch(batchOperation, ozoneTokenIdentifier, renewTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -75,6 +75,8 @@ public OMResponse handleApplyTransaction(OMRequest omRequest,
|
|||||||
case RemoveAcl:
|
case RemoveAcl:
|
||||||
case SetAcl:
|
case SetAcl:
|
||||||
case GetDelegationToken:
|
case GetDelegationToken:
|
||||||
|
case CancelDelegationToken:
|
||||||
|
case RenewDelegationToken:
|
||||||
//TODO: We don't need to pass transactionID, this will be removed when
|
//TODO: We don't need to pass transactionID, this will be removed when
|
||||||
// complete write requests is changed to new model. And also we can
|
// complete write requests is changed to new model. And also we can
|
||||||
// return OMClientResponse, then adding to doubleBuffer can be taken
|
// return OMClientResponse, then adding to doubleBuffer can be taken
|
||||||
|
Loading…
Reference in New Issue
Block a user