HDDS-1175. Serve read requests directly from RocksDB. (#557)

HDDS-1175. Serve read requests directly from RocksDB.
This commit is contained in:
Hanisha Koneru 2019-03-06 19:44:55 -08:00 committed by GitHub
parent a55fc36299
commit bb12e81ec8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 374 additions and 45 deletions

View File

@ -1646,6 +1646,16 @@
</description>
</property>
<property>
<name>ozone.om.ratis.server.role.check.interval</name>
<value>15s</value>
<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
<description>The interval between OM leader performing a role
check on its ratis server. Ratis server informs OM if it
loses the leader role. The scheduled check is an secondary
check to ensure that the leader role is updated periodically
.</description>
</property>
<property>
<name>ozone.acl.authorizer.class</name>

View File

@ -183,6 +183,13 @@ private OMConfigKeys() {
OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
= TimeDuration.valueOf(120, TimeUnit.SECONDS);
// OM Leader server role check interval
public static final String OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY
= "ozone.om.ratis.server.role.check.interval";
public static final TimeDuration
OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
= TimeDuration.valueOf(15, TimeUnit.SECONDS);
public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
+ "kerberos.keytab.file";
public static final String OZONE_OM_KERBEROS_PRINCIPAL_KEY = "ozone.om"

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.exceptions;
import java.io.IOException;
/**
* Exception thrown by
* {@link org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB} when
* a read request is received by a non leader OM node.
*/
public class NotLeaderException extends IOException {
private final String currentPeerId;
private final String leaderPeerId;
public NotLeaderException(String currentPeerIdStr) {
super("OM " + currentPeerIdStr + " is not the leader. Could not " +
"determine the leader node.");
this.currentPeerId = currentPeerIdStr;
this.leaderPeerId = null;
}
public NotLeaderException(String currentPeerIdStr,
String suggestedLeaderPeerIdStr) {
super("OM " + currentPeerIdStr + " is not the leader. Suggested leader is "
+ suggestedLeaderPeerIdStr);
this.currentPeerId = currentPeerIdStr;
this.leaderPeerId = suggestedLeaderPeerIdStr;
}
public String getSuggestedLeaderNodeId() {
return leaderPeerId;
}
}

View File

@ -226,8 +226,14 @@ public Class<OzoneManagerProtocolPB> getInterface() {
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
if (newLeaderOMNodeId == null) {
LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
" node");
performFailover(null);
} else {
if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
}
}
}

View File

@ -15,7 +15,7 @@
* the License.
*/
package org.apache.hadoop.ozone.om.ratis;
package org.apache.hadoop.ozone.om.helpers;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
@ -25,8 +25,6 @@
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
@ -54,14 +52,15 @@ private OMRatisHelper() {
/**
* Creates a new RaftClient object.
* @param rpcType Replication Type
* @param omId OM id of the client
* @param group RaftGroup
*
* @param rpcType Replication Type
* @param omId OM id of the client
* @param group RaftGroup
* @param retryPolicy Retry policy
* @return RaftClient object
*/
static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
group, RetryPolicy retryPolicy, Configuration conf) {
public static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
group, RetryPolicy retryPolicy, Configuration conf) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
@ -85,36 +84,27 @@ static RaftPeerId getRaftPeerId(String omId) {
return RaftPeerId.valueOf(omId);
}
static ByteString convertRequestToByteString(OMRequest request) {
public static ByteString convertRequestToByteString(OMRequest request) {
byte[] requestBytes = request.toByteArray();
return ByteString.copyFrom(requestBytes);
}
static OMRequest convertByteStringToOMRequest(ByteString byteString)
public static OMRequest convertByteStringToOMRequest(ByteString byteString)
throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray();
return OMRequest.parseFrom(bytes);
}
static Message convertResponseToMessage(OMResponse response) {
public static Message convertResponseToMessage(OMResponse response) {
byte[] requestBytes = response.toByteArray();
return Message.valueOf(ByteString.copyFrom(requestBytes));
}
static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
throws InvalidProtocolBufferException {
public static OMResponse getOMResponseFromRaftClientReply(
RaftClientReply reply) throws InvalidProtocolBufferException {
byte[] bytes = reply.getMessage().getContent().toByteArray();
return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
.setLeaderOMNodeId(reply.getReplierId())
.build();
}
static OMResponse getErrorResponse(Type cmdType, Exception e) {
return OMResponse.newBuilder()
.setCmdType(cmdType)
.setSuccess(false)
.setMessage(e.getMessage())
.setStatus(Status.INTERNAL_ERROR)
.build();
}
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
@ -195,29 +196,49 @@ public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
private OzoneManagerProtocolPB createRetryProxy(
OMFailoverProxyProvider failoverProxyProvider,
int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
RetryPolicy retryPolicyOnNetworkException = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailovers, maxRetries, delayMillis, maxDelayBase);
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
if (exception instanceof EOFException ||
exception instanceof ServiceException) {
if (retries < maxRetries && failovers < maxFailovers) {
return RetryAction.FAILOVER_AND_RETRY;
if (exception instanceof ServiceException) {
Throwable cause = exception.getCause();
if (cause instanceof NotLeaderException) {
NotLeaderException notLeaderException = (NotLeaderException) cause;
omFailoverProxyProvider.performFailoverIfRequired(
notLeaderException.getSuggestedLeaderNodeId());
return getRetryAction(RetryAction.RETRY, retries, failovers);
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
failovers);
}
} else if (exception instanceof EOFException) {
return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
failovers);
} else {
return retryPolicyOnNetworkException.shouldRetry(
exception, retries, failovers, isIdempotentOrAtMostOnce);
exception, retries, failovers, isIdempotentOrAtMostOnce);
}
}
private RetryAction getRetryAction(RetryAction fallbackAction,
int retries, int failovers) {
if (retries < maxRetries && failovers < maxFailovers) {
return fallbackAction;
} else {
FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
"Attempted {} retries and {} failovers", retries, failovers);
return RetryAction.FAIL;
}
}
};
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
return proxy;

View File

@ -52,8 +52,6 @@
.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
@ -75,7 +73,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();
@Rule
public Timeout timeout = new Timeout(120_000);
public Timeout timeout = new Timeout(300_000);
/**
* Create a MiniDFSCluster for testing.
@ -93,7 +91,6 @@ public void init() throws Exception {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
@ -313,4 +310,41 @@ public void testOMRetryProxy() throws Exception {
"3 retries and 3 failovers"));
}
}
@Test
public void testReadRequest() throws Exception {
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
objectStore.createVolume(volumeName);
OMFailoverProxyProvider omFailoverProxyProvider =
objectStore.getClientProxy().getOMProxyProvider();
String currentLeaderNodeId = omFailoverProxyProvider
.getCurrentProxyOMNodeId();
// A read request from any proxy should failover to the current leader OM
for (int i = 0; i < numOfOMs; i++) {
// Failover OMFailoverProxyProvider to OM at index i
OzoneManager ozoneManager = cluster.getOzoneManager(i);
String omHostName = ozoneManager.getOmRpcServerAddr().getHostName();
int rpcPort = ozoneManager.getOmRpcServerAddr().getPort();
// Get the ObjectStore and FailoverProxyProvider for OM at index i
final ObjectStore store = OzoneClientFactory.getRpcClient(
omHostName, rpcPort, conf).getObjectStore();
final OMFailoverProxyProvider proxyProvider =
store.getClientProxy().getOMProxyProvider();
// Failover to the OM node that the objectStore points to
omFailoverProxyProvider.performFailoverIfRequired(
ozoneManager.getOMNodId());
// A read request should result in the proxyProvider failing over to
// leader node.
OzoneVolume volume = store.getVolume(volumeName);
Assert.assertEquals(volumeName, volume.getName());
Assert.assertEquals(currentLeaderNodeId,
proxyProvider.getCurrentProxyOMNodeId());
}
}
}

View File

@ -1236,8 +1236,8 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
ProtobufRpcEngine.class);
BlockingService omService = newReflectiveBlockingService(
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
isRatisEnabled));
new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer,
omRatisClient, isRatisEnabled));
return startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos

View File

@ -27,8 +27,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
@ -41,6 +46,11 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@ -50,6 +60,7 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@ -69,7 +80,22 @@ public final class OzoneManagerRatisServer {
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
private final OzoneManagerProtocol ozoneManager;
private final ClientId clientId = ClientId.randomId();
private final ScheduledExecutorService scheduledRoleChecker;
private long roleCheckInitialDelayMs = 1000; // 1 second default
private long roleCheckIntervalMs;
private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftPeerRole> cachedPeerRole = Optional.empty();
private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
/**
* Returns an OM Ratis server.
@ -108,6 +134,20 @@ private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om,
.setProperties(serverProperties)
.setStateMachine(getStateMachine(this.raftGroupId))
.build();
// Run a scheduler to check and update the server role on the leader
// periodically
this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Run this check only on the leader OM
if (cachedPeerRole.isPresent() &&
cachedPeerRole.get() == RaftPeerRole.LEADER) {
updateServerRole();
}
}
}, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
}
/**
@ -156,7 +196,11 @@ public RaftGroup getRaftGroup() {
* Returns OzoneManager StateMachine.
*/
private BaseStateMachine getStateMachine(RaftGroupId gid) {
return new OzoneManagerStateMachine(ozoneManager);
return new OzoneManagerStateMachine(this);
}
public OzoneManagerProtocol getOzoneManager() {
return ozoneManager;
}
/**
@ -323,6 +367,19 @@ private RaftProperties newRaftProperties(Configuration conf) {
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
nodeFailureTimeout);
TimeUnit roleCheckIntervalUnit =
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
.getUnit();
long roleCheckIntervalDuration = conf.getTimeDuration(
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
.getDuration(), nodeFailureTimeoutUnit);
this.roleCheckIntervalMs = TimeDuration.valueOf(
roleCheckIntervalDuration, roleCheckIntervalUnit)
.toLong(TimeUnit.MILLISECONDS);
this.roleCheckInitialDelayMs = leaderElectionMinTimeout
.toLong(TimeUnit.MILLISECONDS);
/**
* TODO: when ratis snapshots are implemented, set snapshot threshold and
* queue size.
@ -331,6 +388,104 @@ private RaftProperties newRaftProperties(Configuration conf) {
return properties;
}
/**
* Check the cached leader status.
* @return true if cached role is Leader, false otherwise.
*/
private boolean checkCachedPeerRoleIsLeader() {
this.roleCheckLock.readLock().lock();
try {
if (cachedPeerRole.isPresent() &&
cachedPeerRole.get() == RaftPeerRole.LEADER) {
return true;
}
return false;
} finally {
this.roleCheckLock.readLock().unlock();
}
}
/**
* Check if the current OM node is the leader node.
* @return true if Leader, false otherwise.
*/
public boolean isLeader() {
if (checkCachedPeerRoleIsLeader()) {
return true;
}
// Get the server role from ratis server and update the cached values.
updateServerRole();
// After updating the server role, check and return if leader or not.
return checkCachedPeerRoleIsLeader();
}
/**
* Get the suggested leader peer id.
* @return RaftPeerId of the suggested leader node.
*/
public Optional<RaftPeerId> getCachedLeaderPeerId() {
this.roleCheckLock.readLock().lock();
try {
return cachedLeaderPeerId;
} finally {
this.roleCheckLock.readLock().unlock();
}
}
/**
* Get the gorup info (peer role and leader peer id) from Ratis server and
* update the OM server role.
*/
public void updateServerRole() {
try {
GroupInfoReply groupInfo = getGroupInfo();
RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
RaftPeerRole thisNodeRole = roleInfoProto.getRole();
if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
setServerRole(thisNodeRole, raftPeerId);
} else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getId();
RaftPeerId leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
setServerRole(thisNodeRole, leaderPeerId);
} else {
setServerRole(thisNodeRole, null);
}
} catch (IOException e) {
LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
"{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
setServerRole(null, null);
}
}
/**
* Set the current server role and the leader peer id.
*/
private void setServerRole(RaftPeerRole currentRole,
RaftPeerId leaderPeerId) {
this.roleCheckLock.writeLock().lock();
try {
this.cachedPeerRole = Optional.ofNullable(currentRole);
this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
} finally {
this.roleCheckLock.writeLock().unlock();
}
}
private GroupInfoReply getGroupInfo() throws IOException {
GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
raftPeerId, raftGroupId, nextCallId());
GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
return groupInfo;
}
public int getServerPort() {
return port;
}

View File

@ -20,10 +20,12 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
@ -54,11 +56,15 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
LoggerFactory.getLogger(ContainerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManagerProtocol ozoneManager;
private final OzoneManagerRequestHandler handler;
private RaftGroupId raftGroupId;
public OzoneManagerStateMachine(OzoneManagerProtocol om) {
this.handler = new OzoneManagerRequestHandler(om);
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
this.ozoneManager = omRatisServer.getOzoneManager();
this.handler = new OzoneManagerRequestHandler(ozoneManager);
}
/**
@ -137,6 +143,15 @@ public CompletableFuture<Message> query(Message request) {
}
}
/**
* Notifies the state machine that the raft peer is no longer leader.
*/
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
omRatisServer.updateServerRole();
}
/**
* Submits request to OM and returns the response Message.
* @param request OMRequest

View File

@ -17,18 +17,24 @@
package org.apache.hadoop.ozone.protocolPB;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import io.opentracing.Scope;
import org.apache.ratis.protocol.RaftPeerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
/**
* This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB}
@ -38,6 +44,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
OzoneManagerProtocolPB {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private final OzoneManagerRatisServer omRatisServer;
private final OzoneManagerRatisClient omRatisClient;
private final OzoneManagerRequestHandler handler;
private final boolean isRatisEnabled;
@ -48,9 +55,10 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
* @param impl OzoneManagerProtocolPB
*/
public OzoneManagerProtocolServerSideTranslatorPB(
OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient,
boolean enableRatis) {
OzoneManagerProtocol impl, OzoneManagerRatisServer ratisServer,
OzoneManagerRatisClient ratisClient, boolean enableRatis) {
handler = new OzoneManagerRequestHandler(impl);
this.omRatisServer = ratisServer;
this.omRatisClient = ratisClient;
this.isRatisEnabled = enableRatis;
}
@ -68,7 +76,12 @@ public OMResponse submitRequest(RpcController controller,
request.getTraceID());
try {
if (isRatisEnabled) {
return submitRequestToRatis(request);
// Check if the request is a read only request
if (OmUtils.isReadOnly(request)) {
return submitReadRequestToOM(request);
} else {
return submitRequestToRatis(request);
}
} else {
return submitRequestDirectlyToOM(request);
}
@ -85,6 +98,32 @@ private OMResponse submitRequestToRatis(OMRequest request)
return omRatisClient.sendCommand(request);
}
private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
if (omRatisServer.isLeader()) {
return handler.handle(request);
} else {
RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
Optional<RaftPeerId> leaderRaftPeerId = omRatisServer
.getCachedLeaderPeerId();
NotLeaderException notLeaderException;
if (leaderRaftPeerId.isPresent()) {
notLeaderException = new NotLeaderException(raftPeerId.toString());
} else {
notLeaderException = new NotLeaderException(
raftPeerId.toString(), leaderRaftPeerId.toString());
}
if (LOG.isDebugEnabled()) {
LOG.debug(notLeaderException.getMessage());
}
throw new ServiceException(notLeaderException);
}
}
/**
* Submits request directly to OM.
*/