HDDS-1250. In OM HA AllocateBlock call where connecting to SCM from OM should not happen on Ratis.
This commit is contained in:
parent
c0427c84dd
commit
93db5da4d9
@ -24,6 +24,7 @@ public enum OMAction implements AuditAction {
|
||||
|
||||
// WRITE Actions
|
||||
ALLOCATE_BLOCK,
|
||||
ADD_ALLOCATE_BLOCK,
|
||||
ALLOCATE_KEY,
|
||||
COMMIT_KEY,
|
||||
CREATE_VOLUME,
|
||||
|
@ -0,0 +1,55 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om.protocol;
|
||||
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.KeyLocation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Protocol to talk to OM HA. These methods are needed only called from
|
||||
* OmRequestHandler.
|
||||
*/
|
||||
public interface OzoneManagerHAProtocol {
|
||||
|
||||
/**
|
||||
* Add a allocate block, it is assumed that the client is having an open
|
||||
* key session going on. This block will be appended to this open key session.
|
||||
* This will be called only during HA enabled OM, as during HA we get an
|
||||
* allocated Block information, and add that information to OM DB.
|
||||
*
|
||||
* In HA the flow for allocateBlock is in StartTransaction allocateBlock
|
||||
* will be called which returns block information, and in the
|
||||
* applyTransaction addAllocateBlock will be called to add the block
|
||||
* information to DB.
|
||||
*
|
||||
* @param args the key to append
|
||||
* @param clientID the client identification
|
||||
* @param keyLocation key location given by allocateBlock
|
||||
* @return an allocated block
|
||||
* @throws IOException
|
||||
*/
|
||||
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
|
||||
KeyLocation keyLocation) throws IOException;
|
||||
|
||||
|
||||
}
|
@ -190,6 +190,7 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
||||
OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
ExcludeList excludeList) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Look up for the container of an existing key.
|
||||
*
|
||||
|
@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om.protocol;
|
||||
|
||||
/**
|
||||
* This will be used in the OzoneManager Server, as few of the methods in
|
||||
* OzoneManagerHAProtocol need not be exposed to Om clients. This interface
|
||||
* extends both OzoneManagerHAProtocol and OzoneManagerProtocol.
|
||||
*/
|
||||
public interface OzoneManagerServerProtocol extends OzoneManagerProtocol,
|
||||
OzoneManagerHAProtocol {
|
||||
}
|
@ -719,7 +719,6 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
|
||||
.getAllocateBlockResponse();
|
||||
return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitKey(OmKeyArgs args, long clientId)
|
||||
throws IOException {
|
||||
|
@ -628,6 +628,10 @@ message AllocateBlockRequest {
|
||||
required KeyArgs keyArgs = 1;
|
||||
required uint64 clientID = 2;
|
||||
optional hadoop.hdds.ExcludeListProto excludeList = 3;
|
||||
// During HA on one of the OM nodes, we allocate block and send the
|
||||
// AllocateBlockRequest with keyLocation set. If this is set, no need to
|
||||
// call scm again in OM Ratis applyTransaction just append it to DB.
|
||||
optional KeyLocation keyLocation = 4;
|
||||
}
|
||||
|
||||
message AllocateBlockResponse {
|
||||
|
@ -149,7 +149,8 @@ public void testAllocateCommit() throws Exception {
|
||||
|
||||
// this block will be appended to the latest version of version 2.
|
||||
OmKeyLocationInfo locationInfo =
|
||||
ozoneManager.allocateBlock(keyArgs, openKey.getId(), new ExcludeList());
|
||||
ozoneManager.allocateBlock(keyArgs, openKey.getId(),
|
||||
new ExcludeList());
|
||||
List<OmKeyLocationInfo> locationInfoList =
|
||||
openKey.getKeyInfo().getLatestVersionLocations()
|
||||
.getBlocksLatestVersionOnly();
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -75,6 +76,20 @@ public interface KeyManager {
|
||||
*/
|
||||
OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
ExcludeList excludeList) throws IOException;
|
||||
|
||||
/**
|
||||
* Ozone manager state machine call's this on an open key, to add allocated
|
||||
* block to the tail of current block list of the open client.
|
||||
*
|
||||
* @param args the key to append
|
||||
* @param clientID the client requesting block.
|
||||
* @param keyLocation key location.
|
||||
* @return the reference to the new block.
|
||||
* @throws IOException
|
||||
*/
|
||||
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
|
||||
OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException;
|
||||
|
||||
/**
|
||||
* Given the args of a key to put, write an open key entry to meta data.
|
||||
*
|
||||
|
@ -63,6 +63,7 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -120,6 +121,8 @@ public class KeyManagerImpl implements KeyManager {
|
||||
|
||||
private final KeyProviderCryptoExtension kmsProvider;
|
||||
|
||||
private final boolean isRatisEnabled;
|
||||
|
||||
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
|
||||
OzoneBlockTokenSecretManager secretManager) {
|
||||
@ -148,6 +151,9 @@ public KeyManagerImpl(ScmClient scmClient,
|
||||
HDDS_BLOCK_TOKEN_ENABLED,
|
||||
HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
|
||||
this.kmsProvider = kmsProvider;
|
||||
this.isRatisEnabled = conf.getBoolean(
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -223,10 +229,41 @@ private void validateS3Bucket(String volumeName, String bucketName)
|
||||
}
|
||||
|
||||
@Override
|
||||
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
ExcludeList excludeList)
|
||||
throws IOException {
|
||||
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
|
||||
OzoneManagerProtocolProtos.KeyLocation keyLocation) throws IOException {
|
||||
Preconditions.checkNotNull(args);
|
||||
Preconditions.checkNotNull(keyLocation);
|
||||
|
||||
|
||||
String volumeName = args.getVolumeName();
|
||||
String bucketName = args.getBucketName();
|
||||
String keyName = args.getKeyName();
|
||||
validateBucket(volumeName, bucketName);
|
||||
String openKey = metadataManager.getOpenKey(
|
||||
volumeName, bucketName, keyName, clientID);
|
||||
|
||||
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (keyInfo == null) {
|
||||
LOG.error("Allocate block for a key not in open status in meta store" +
|
||||
" /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
|
||||
throw new OMException("Open Key not found",
|
||||
OMException.ResultCodes.KEY_NOT_FOUND);
|
||||
}
|
||||
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
OmKeyLocationInfo.getFromProtobuf(keyLocation);
|
||||
keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo));
|
||||
keyInfo.updateModifcationTime();
|
||||
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
|
||||
return omKeyLocationInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
ExcludeList excludeList) throws IOException {
|
||||
Preconditions.checkNotNull(args);
|
||||
|
||||
|
||||
String volumeName = args.getVolumeName();
|
||||
String bucketName = args.getBucketName();
|
||||
String keyName = args.getKeyName();
|
||||
@ -246,11 +283,16 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
// the same version
|
||||
List<OmKeyLocationInfo> locationInfos =
|
||||
allocateBlock(keyInfo, excludeList, scmBlockSize);
|
||||
keyInfo.appendNewBlocks(locationInfos);
|
||||
keyInfo.updateModifcationTime();
|
||||
metadataManager.getOpenKeyTable().put(openKey,
|
||||
keyInfo);
|
||||
|
||||
// If om is not managing via ratis, write to db, otherwise write to DB
|
||||
// will happen via ratis apply transaction.
|
||||
if (!isRatisEnabled) {
|
||||
keyInfo.appendNewBlocks(locationInfos);
|
||||
keyInfo.updateModifcationTime();
|
||||
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
|
||||
}
|
||||
return locationInfos.get(0);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -60,6 +60,7 @@ public class OMMetrics {
|
||||
private @Metric MutableCounterLong numVolumeLists;
|
||||
private @Metric MutableCounterLong numKeyCommits;
|
||||
private @Metric MutableCounterLong numAllocateBlockCalls;
|
||||
private @Metric MutableCounterLong numAddAllocateBlockCalls;
|
||||
private @Metric MutableCounterLong numGetServiceLists;
|
||||
private @Metric MutableCounterLong numListS3Buckets;
|
||||
private @Metric MutableCounterLong numInitiateMultipartUploads;
|
||||
@ -85,6 +86,7 @@ public class OMMetrics {
|
||||
private @Metric MutableCounterLong numVolumeListFails;
|
||||
private @Metric MutableCounterLong numKeyCommitFails;
|
||||
private @Metric MutableCounterLong numBlockAllocateCallFails;
|
||||
private @Metric MutableCounterLong numAddAllocateBlockCallFails;
|
||||
private @Metric MutableCounterLong numGetServiceListFails;
|
||||
private @Metric MutableCounterLong numListS3BucketsFails;
|
||||
private @Metric MutableCounterLong numInitiateMultipartUploadFails;
|
||||
@ -379,6 +381,14 @@ public void incNumBlockAllocateCallFails() {
|
||||
numBlockAllocateCallFails.incr();
|
||||
}
|
||||
|
||||
public void incNumAddAllocateBlockCalls() {
|
||||
numAddAllocateBlockCalls.incr();
|
||||
}
|
||||
|
||||
public void incNumAddAllocateBlockFails() {
|
||||
numAddAllocateBlockCallFails.incr();
|
||||
}
|
||||
|
||||
public void incNumBucketListFails() {
|
||||
numBucketListFails.incr();
|
||||
}
|
||||
|
@ -72,6 +72,8 @@
|
||||
import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
||||
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
|
||||
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
|
||||
import org.apache.hadoop.ozone.security.OzoneSecurityException;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -102,7 +104,6 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
|
||||
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
|
||||
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
|
||||
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
|
||||
@ -205,7 +206,7 @@
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
||||
public final class OzoneManager extends ServiceRuntimeInfoImpl
|
||||
implements OzoneManagerProtocol, OMMXBean, Auditor {
|
||||
implements OzoneManagerServerProtocol, OMMXBean, Auditor {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneManager.class);
|
||||
|
||||
@ -2036,6 +2037,35 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
|
||||
KeyLocation keyLocation) throws IOException {
|
||||
if(isAclEnabled) {
|
||||
checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
|
||||
args.getVolumeName(), args.getBucketName(), args.getKeyName());
|
||||
}
|
||||
boolean auditSuccess = true;
|
||||
Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
|
||||
args.toAuditMap();
|
||||
auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
|
||||
try {
|
||||
metrics.incNumAddAllocateBlockCalls();
|
||||
return keyManager.addAllocatedBlock(args, clientID, keyLocation);
|
||||
} catch (Exception ex) {
|
||||
metrics.incNumAddAllocateBlockFails();
|
||||
auditSuccess = false;
|
||||
AUDIT.logWriteFailure(buildAuditMessageForFailure(
|
||||
OMAction.ADD_ALLOCATE_BLOCK, auditMap, ex));
|
||||
throw ex;
|
||||
} finally {
|
||||
if(auditSuccess){
|
||||
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
|
||||
OMAction.ADD_ALLOCATE_BLOCK, auditMap));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup a key.
|
||||
*
|
||||
|
@ -40,7 +40,8 @@
|
||||
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
import org.apache.ratis.RaftConfigKeys;
|
||||
import org.apache.ratis.client.RaftClientConfigKeys;
|
||||
import org.apache.ratis.conf.RaftProperties;
|
||||
@ -81,7 +82,7 @@ public final class OzoneManagerRatisServer {
|
||||
private final RaftGroup raftGroup;
|
||||
private final RaftPeerId raftPeerId;
|
||||
|
||||
private final OzoneManagerProtocol ozoneManager;
|
||||
private final OzoneManagerServerProtocol ozoneManager;
|
||||
private final ClientId clientId = ClientId.randomId();
|
||||
|
||||
private final ScheduledExecutorService scheduledRoleChecker;
|
||||
@ -107,7 +108,8 @@ private static long nextCallId() {
|
||||
* @param raftPeers peer nodes in the raft ring
|
||||
* @throws IOException
|
||||
*/
|
||||
private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om,
|
||||
private OzoneManagerRatisServer(Configuration conf,
|
||||
OzoneManagerServerProtocol om,
|
||||
String raftGroupIdStr, RaftPeerId localRaftPeerId,
|
||||
InetSocketAddress addr, List<RaftPeer> raftPeers)
|
||||
throws IOException {
|
||||
@ -154,7 +156,7 @@ public void run() {
|
||||
* Creates an instance of OzoneManagerRatisServer.
|
||||
*/
|
||||
public static OzoneManagerRatisServer newOMRatisServer(
|
||||
Configuration ozoneConf, OzoneManagerProtocol om,
|
||||
Configuration ozoneConf, OzoneManager om,
|
||||
OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
|
||||
throws IOException {
|
||||
|
||||
@ -199,7 +201,7 @@ private BaseStateMachine getStateMachine(RaftGroupId gid) {
|
||||
return new OzoneManagerStateMachine(this);
|
||||
}
|
||||
|
||||
public OzoneManagerProtocol getOzoneManager() {
|
||||
public OzoneManagerServerProtocol getOzoneManager() {
|
||||
return ozoneManager;
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.om.ratis;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.io.IOException;
|
||||
@ -26,12 +27,14 @@
|
||||
.ContainerStateMachine;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.OMRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.OMResponse;
|
||||
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
|
||||
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
|
||||
import org.apache.ratis.proto.RaftProtos;
|
||||
import org.apache.ratis.protocol.Message;
|
||||
import org.apache.ratis.protocol.RaftClientRequest;
|
||||
@ -57,8 +60,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
|
||||
private final SimpleStateMachineStorage storage =
|
||||
new SimpleStateMachineStorage();
|
||||
private final OzoneManagerRatisServer omRatisServer;
|
||||
private final OzoneManagerProtocol ozoneManager;
|
||||
private final OzoneManagerRequestHandler handler;
|
||||
private final OzoneManagerServerProtocol ozoneManager;
|
||||
private RequestHandler handler;
|
||||
private RaftGroupId raftGroupId;
|
||||
|
||||
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
|
||||
@ -105,6 +108,11 @@ public TransactionContext startTransaction(
|
||||
ctxt.setException(ioe);
|
||||
return ctxt;
|
||||
}
|
||||
|
||||
if (omRequest.getCmdType() ==
|
||||
OzoneManagerProtocolProtos.Type.AllocateBlock) {
|
||||
return handleAllocateBlock(raftClientRequest, omRequest);
|
||||
}
|
||||
return TransactionContext.newBuilder()
|
||||
.setClientRequest(raftClientRequest)
|
||||
.setStateMachine(this)
|
||||
@ -113,6 +121,66 @@ public TransactionContext startTransaction(
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle AllocateBlock Request, which needs a special handling. This
|
||||
* request needs to be executed on the leader, where it connects to SCM and
|
||||
* get Block information.
|
||||
* @param raftClientRequest
|
||||
* @param omRequest
|
||||
* @return TransactionContext
|
||||
*/
|
||||
private TransactionContext handleAllocateBlock(
|
||||
RaftClientRequest raftClientRequest, OMRequest omRequest) {
|
||||
OMResponse omResponse = handler.handle(omRequest);
|
||||
|
||||
|
||||
// If request is failed, no need to proceed further.
|
||||
// Setting the exception with omResponse message and code.
|
||||
|
||||
// TODO: the allocate block fails when scm is in chill mode or when scm is
|
||||
// down, but that error is not correctly received in OM end, once that
|
||||
// is fixed, we need to see how to handle this failure case or how we
|
||||
// need to retry or how to handle this scenario. For other errors like
|
||||
// KEY_NOT_FOUND, we don't need a retry/
|
||||
if (!omResponse.getSuccess()) {
|
||||
TransactionContext transactionContext = TransactionContext.newBuilder()
|
||||
.setClientRequest(raftClientRequest)
|
||||
.setStateMachine(this)
|
||||
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
|
||||
.build();
|
||||
IOException ioe = new IOException(omResponse.getMessage() +
|
||||
" Status code " + omResponse.getStatus());
|
||||
transactionContext.setException(ioe);
|
||||
return transactionContext;
|
||||
}
|
||||
|
||||
|
||||
// Get original request
|
||||
OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest =
|
||||
omRequest.getAllocateBlockRequest();
|
||||
|
||||
// Create new AllocateBlockRequest with keyLocation set.
|
||||
OzoneManagerProtocolProtos.AllocateBlockRequest newAllocateBlockRequest =
|
||||
OzoneManagerProtocolProtos.AllocateBlockRequest.newBuilder().
|
||||
mergeFrom(allocateBlockRequest)
|
||||
.setKeyLocation(
|
||||
omResponse.getAllocateBlockResponse().getKeyLocation()).build();
|
||||
|
||||
OMRequest newOmRequest = omRequest.toBuilder().setCmdType(
|
||||
OzoneManagerProtocolProtos.Type.AllocateBlock)
|
||||
.setAllocateBlockRequest(newAllocateBlockRequest).build();
|
||||
|
||||
ByteString messageContent = ByteString.copyFrom(newOmRequest.toByteArray());
|
||||
|
||||
return TransactionContext.newBuilder()
|
||||
.setClientRequest(raftClientRequest)
|
||||
.setStateMachine(this)
|
||||
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
|
||||
.setLogData(messageContent)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Apply a committed log entry to the state machine.
|
||||
*/
|
||||
@ -169,4 +237,14 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
|
||||
return future;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setHandler(RequestHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setRaftGroupId(RaftGroupId raftGroupId) {
|
||||
this.raftGroupId = raftGroupId;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,7 +19,7 @@
|
||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
|
||||
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
|
||||
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
|
||||
@ -46,7 +46,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
||||
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
|
||||
private final OzoneManagerRatisServer omRatisServer;
|
||||
private final OzoneManagerRatisClient omRatisClient;
|
||||
private final OzoneManagerRequestHandler handler;
|
||||
private final RequestHandler handler;
|
||||
private final boolean isRatisEnabled;
|
||||
|
||||
/**
|
||||
@ -55,7 +55,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
|
||||
* @param impl OzoneManagerProtocolPB
|
||||
*/
|
||||
public OzoneManagerProtocolServerSideTranslatorPB(
|
||||
OzoneManagerProtocol impl, OzoneManagerRatisServer ratisServer,
|
||||
OzoneManagerServerProtocol impl, OzoneManagerRatisServer ratisServer,
|
||||
OzoneManagerRatisClient ratisClient, boolean enableRatis) {
|
||||
handler = new OzoneManagerRequestHandler(impl);
|
||||
this.omRatisServer = ratisServer;
|
||||
@ -89,7 +89,6 @@ public OMResponse submitRequest(RpcController controller,
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits request to OM's Ratis server.
|
||||
*/
|
||||
|
@ -41,7 +41,7 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
|
||||
@ -123,17 +123,18 @@
|
||||
* Command Handler for OM requests. OM State Machine calls this handler for
|
||||
* deserializing the client request and sending it to OM.
|
||||
*/
|
||||
public class OzoneManagerRequestHandler {
|
||||
public class OzoneManagerRequestHandler implements RequestHandler {
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
|
||||
private final OzoneManagerProtocol impl;
|
||||
private final OzoneManagerServerProtocol impl;
|
||||
|
||||
public OzoneManagerRequestHandler(OzoneManagerProtocol om) {
|
||||
public OzoneManagerRequestHandler(OzoneManagerServerProtocol om) {
|
||||
this.impl = om;
|
||||
}
|
||||
|
||||
//TODO simplify it to make it shorter
|
||||
@SuppressWarnings("methodlength")
|
||||
@Override
|
||||
public OMResponse handle(OMRequest request) {
|
||||
LOG.debug("Received OMRequest: {}, ", request);
|
||||
Type cmdType = request.getCmdType();
|
||||
@ -344,6 +345,7 @@ private Status exceptionToResponseStatus(IOException ex) {
|
||||
* @param omRequest client request to OM
|
||||
* @throws OMException thrown if required parameters are set to null.
|
||||
*/
|
||||
@Override
|
||||
public void validateRequest(OMRequest omRequest) throws OMException {
|
||||
Type cmdType = omRequest.getCmdType();
|
||||
if (cmdType == null) {
|
||||
@ -627,9 +629,18 @@ private AllocateBlockResponse allocateBlock(AllocateBlockRequest request)
|
||||
.setBucketName(keyArgs.getBucketName())
|
||||
.setKeyName(keyArgs.getKeyName())
|
||||
.build();
|
||||
OmKeyLocationInfo newLocation =
|
||||
impl.allocateBlock(omKeyArgs, request.getClientID(),
|
||||
ExcludeList.getFromProtoBuf(request.getExcludeList()));
|
||||
|
||||
OmKeyLocationInfo newLocation;
|
||||
if (request.hasKeyLocation()) {
|
||||
newLocation =
|
||||
impl.addAllocatedBlock(omKeyArgs, request.getClientID(),
|
||||
request.getKeyLocation());
|
||||
} else {
|
||||
newLocation =
|
||||
impl.allocateBlock(omKeyArgs, request.getClientID(),
|
||||
ExcludeList.getFromProtoBuf(request.getExcludeList()));
|
||||
}
|
||||
|
||||
resp.setKeyLocation(newLocation.getProtobuf());
|
||||
|
||||
return resp.build();
|
||||
|
@ -0,0 +1,31 @@
|
||||
package org.apache.hadoop.ozone.protocolPB;
|
||||
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
|
||||
OMRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
|
||||
OMResponse;
|
||||
|
||||
/**
|
||||
* Handler to handle the OmRequests.
|
||||
*/
|
||||
public interface RequestHandler {
|
||||
|
||||
|
||||
/**
|
||||
* Handle the OmRequest, and returns OmResponse.
|
||||
* @param request
|
||||
* @return OmResponse
|
||||
*/
|
||||
OMResponse handle(OMRequest request);
|
||||
|
||||
/**
|
||||
* Validates that the incoming OM request has required parameters.
|
||||
* TODO: Add more validation checks before writing the request to Ratis log.
|
||||
*
|
||||
* @param omRequest client request to OM
|
||||
* @throws OMException thrown if required parameters are set to null.
|
||||
*/
|
||||
void validateRequest(OMRequest omRequest) throws OMException;
|
||||
|
||||
}
|
@ -0,0 +1,264 @@
|
||||
package org.apache.hadoop.ozone.om.ratis;
|
||||
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.AllocateBlockRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.KeyArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.OMRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
.OMResponse;
|
||||
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
|
||||
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
|
||||
import org.apache.ratis.proto.RaftProtos;
|
||||
import org.apache.ratis.protocol.ClientId;
|
||||
import org.apache.ratis.protocol.Message;
|
||||
import org.apache.ratis.protocol.RaftClientRequest;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.protocol.RaftPeerId;
|
||||
import org.apache.ratis.statemachine.TransactionContext;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* This class tests OzoneManagerStateMachine.
|
||||
*/
|
||||
public class TestOzoneManagerStateMachine {
|
||||
|
||||
private OzoneConfiguration conf;
|
||||
private OzoneManagerRatisServer omRatisServer;
|
||||
private String omID;
|
||||
private RequestHandler requestHandler;
|
||||
private RaftGroupId raftGroupId;
|
||||
private OzoneManagerStateMachine ozoneManagerStateMachine;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
omID = UUID.randomUUID().toString();
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
|
||||
temporaryFolder.newFolder().toString());
|
||||
int ratisPort = conf.getInt(
|
||||
OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
|
||||
OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
|
||||
InetSocketAddress rpcAddress = new InetSocketAddress(
|
||||
InetAddress.getLocalHost(), 0);
|
||||
OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
|
||||
.setRpcAddress(rpcAddress)
|
||||
.setRatisPort(ratisPort)
|
||||
.setOMNodeId(omID)
|
||||
.setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT)
|
||||
.build();
|
||||
// Starts a single node Ratis server
|
||||
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null,
|
||||
omNodeDetails, Collections.emptyList());
|
||||
|
||||
|
||||
ozoneManagerStateMachine =
|
||||
new OzoneManagerStateMachine(omRatisServer);
|
||||
|
||||
requestHandler = Mockito.mock(OzoneManagerRequestHandler.class);
|
||||
raftGroupId = omRatisServer.getRaftGroup().getGroupId();
|
||||
|
||||
ozoneManagerStateMachine.setHandler(requestHandler);
|
||||
ozoneManagerStateMachine.setRaftGroupId(raftGroupId);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocateBlockRequestWithSuccess() throws Exception {
|
||||
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
String bucketName = UUID.randomUUID().toString();
|
||||
String keyName = UUID.randomUUID().toString();
|
||||
long allocateBlockClientId = RandomUtils.nextLong();
|
||||
String clientId = UUID.randomUUID().toString();
|
||||
|
||||
|
||||
OMRequest omRequest = createOmRequestForAllocateBlock(volumeName,
|
||||
bucketName, keyName, allocateBlockClientId, clientId);
|
||||
|
||||
OzoneManagerProtocolProtos.OMResponse omResponse =
|
||||
createOmResponseForAllocateBlock(true);
|
||||
|
||||
when(requestHandler.handle(omRequest)).thenReturn(omResponse);
|
||||
|
||||
|
||||
RaftClientRequest raftClientRequest =
|
||||
new RaftClientRequest(ClientId.randomId(),
|
||||
RaftPeerId.valueOf("random"), raftGroupId, 1, 1,
|
||||
Message.valueOf(
|
||||
OMRatisHelper.convertRequestToByteString(omRequest)),
|
||||
RaftClientRequest.Type.valueOf(
|
||||
RaftProtos.WriteRequestTypeProto.getDefaultInstance()));
|
||||
|
||||
TransactionContext transactionContext =
|
||||
ozoneManagerStateMachine.startTransaction(raftClientRequest);
|
||||
|
||||
|
||||
OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest(
|
||||
transactionContext.getStateMachineLogEntry().getLogData());
|
||||
|
||||
Assert.assertTrue(newOmRequest.hasAllocateBlockRequest());
|
||||
checkModifiedOmRequest(omRequest, newOmRequest);
|
||||
|
||||
// Check this keyLocation, and the keyLocation is same as from OmResponse.
|
||||
Assert.assertTrue(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
|
||||
|
||||
Assert.assertEquals(omResponse.getAllocateBlockResponse().getKeyLocation(),
|
||||
newOmRequest.getAllocateBlockRequest().getKeyLocation());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private OMRequest createOmRequestForAllocateBlock(String volumeName,
|
||||
String bucketName, String keyName, long allocateClientId,
|
||||
String clientId) {
|
||||
//Create AllocateBlockRequest
|
||||
AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
|
||||
KeyArgs keyArgs = KeyArgs.newBuilder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(100).build();
|
||||
req.setKeyArgs(keyArgs);
|
||||
req.setClientID(allocateClientId);
|
||||
req.setExcludeList(HddsProtos.ExcludeListProto.getDefaultInstance());
|
||||
return OMRequest.newBuilder()
|
||||
.setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
|
||||
.setAllocateBlockRequest(req)
|
||||
.setClientId(clientId)
|
||||
.build();
|
||||
}
|
||||
|
||||
private OMResponse createOmResponseForAllocateBlock(boolean status) {
|
||||
OmKeyLocationInfo newLocation = new OmKeyLocationInfo.Builder().setBlockID(
|
||||
new BlockID(RandomUtils.nextLong(),
|
||||
RandomUtils.nextLong()))
|
||||
.setLength(RandomUtils.nextLong())
|
||||
.setOffset(0).setPipeline(
|
||||
Pipeline.newBuilder().setId(PipelineID.randomId())
|
||||
.setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE)
|
||||
.setState(Pipeline.PipelineState.OPEN)
|
||||
.setNodes(new ArrayList<>()).build()).build();
|
||||
|
||||
OzoneManagerProtocolProtos.AllocateBlockResponse.Builder resp =
|
||||
OzoneManagerProtocolProtos.AllocateBlockResponse.newBuilder();
|
||||
resp.setKeyLocation(newLocation.getProtobuf());
|
||||
|
||||
|
||||
if (status) {
|
||||
return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true)
|
||||
.setAllocateBlockResponse(resp)
|
||||
.setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
|
||||
.setStatus(OzoneManagerProtocolProtos.Status.OK)
|
||||
.setSuccess(status).build();
|
||||
} else {
|
||||
return OzoneManagerProtocolProtos.OMResponse.newBuilder().setSuccess(true)
|
||||
.setAllocateBlockResponse(resp)
|
||||
.setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
|
||||
.setStatus(OzoneManagerProtocolProtos.Status.SCM_IN_CHILL_MODE)
|
||||
.setMessage("Scm in Chill mode")
|
||||
.setSuccess(status).build();
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testAllocateBlockWithFailure() throws Exception{
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
String bucketName = UUID.randomUUID().toString();
|
||||
String keyName = UUID.randomUUID().toString();
|
||||
long allocateBlockClientId = RandomUtils.nextLong();
|
||||
String clientId = UUID.randomUUID().toString();
|
||||
|
||||
|
||||
OMRequest omRequest = createOmRequestForAllocateBlock(volumeName,
|
||||
bucketName, keyName, allocateBlockClientId, clientId);
|
||||
|
||||
OzoneManagerProtocolProtos.OMResponse omResponse =
|
||||
createOmResponseForAllocateBlock(false);
|
||||
|
||||
when(requestHandler.handle(omRequest)).thenReturn(omResponse);
|
||||
|
||||
|
||||
RaftClientRequest raftClientRequest =
|
||||
new RaftClientRequest(ClientId.randomId(),
|
||||
RaftPeerId.valueOf("random"), raftGroupId, 1, 1,
|
||||
Message.valueOf(
|
||||
OMRatisHelper.convertRequestToByteString(omRequest)),
|
||||
RaftClientRequest.Type.valueOf(
|
||||
RaftProtos.WriteRequestTypeProto.getDefaultInstance()));
|
||||
|
||||
TransactionContext transactionContext =
|
||||
ozoneManagerStateMachine.startTransaction(raftClientRequest);
|
||||
|
||||
|
||||
OMRequest newOmRequest = OMRatisHelper.convertByteStringToOMRequest(
|
||||
transactionContext.getStateMachineLogEntry().getLogData());
|
||||
|
||||
Assert.assertTrue(newOmRequest.hasAllocateBlockRequest());
|
||||
checkModifiedOmRequest(omRequest, newOmRequest);
|
||||
|
||||
// As the request failed, check for keyLocation and the transaction
|
||||
// context error message
|
||||
Assert.assertFalse(newOmRequest.getAllocateBlockRequest().hasKeyLocation());
|
||||
Assert.assertEquals("Scm in Chill mode Status code "
|
||||
+ OMException.ResultCodes.SCM_IN_CHILL_MODE,
|
||||
transactionContext.getException().getMessage());
|
||||
Assert.assertTrue(transactionContext.getException() instanceof IOException);
|
||||
|
||||
}
|
||||
|
||||
private void checkModifiedOmRequest(OMRequest omRequest,
|
||||
OMRequest newOmRequest) {
|
||||
Assert.assertTrue(newOmRequest.getAllocateBlockRequest()
|
||||
.getKeyArgs().getBucketName().equals(
|
||||
omRequest.getAllocateBlockRequest().getKeyArgs().getBucketName()));
|
||||
Assert.assertTrue(omRequest.getAllocateBlockRequest()
|
||||
.getKeyArgs().getVolumeName().equals(
|
||||
newOmRequest.getAllocateBlockRequest().getKeyArgs()
|
||||
.getVolumeName()));
|
||||
Assert.assertTrue(omRequest.getAllocateBlockRequest()
|
||||
.getKeyArgs().getKeyName().equals(
|
||||
newOmRequest.getAllocateBlockRequest().getKeyArgs().getKeyName()));
|
||||
Assert.assertEquals(omRequest.getAllocateBlockRequest()
|
||||
.getKeyArgs().getDataSize(),
|
||||
newOmRequest.getAllocateBlockRequest().getKeyArgs().getDataSize());
|
||||
Assert.assertEquals(omRequest.getAllocateBlockRequest()
|
||||
.getClientID(),
|
||||
newOmRequest.getAllocateBlockRequest().getClientID());
|
||||
Assert.assertEquals(omRequest.getClientId(), newOmRequest.getClientId());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user