From 12402b7a74a9069230ebfdc9e5c2ee0e28a23990 Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Tue, 5 Mar 2019 09:24:22 -0800 Subject: [PATCH] HDDS-1072. Implement RetryProxy and FailoverProxy for OM client. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 17 ++ .../org/apache/hadoop/ozone/OzoneConsts.java | 3 + .../src/main/resources/ozone-default.xml | 43 ++- .../ozone/client/protocol/ClientProtocol.java | 4 +- .../hadoop/ozone/client/rest/RestClient.java | 4 +- .../hadoop/ozone/client/rpc/RpcClient.java | 21 +- .../ozone/client/rpc/ha/OMProxyInfo.java | 49 ---- .../ozone/client/rpc/ha/OMProxyProvider.java | 177 ------------ .../ozone/om/ha/OMFailoverProxyProvider.java | 269 ++++++++++++++++++ .../hadoop/ozone/om}/ha/package-info.java | 2 +- .../om/protocol/OzoneManagerProtocol.java | 9 + ...ManagerProtocolClientSideTranslatorPB.java | 107 ++++++- .../src/main/proto/OzoneManagerProtocol.proto | 2 + .../hadoop/ozone/MiniOzoneHAClusterImpl.java | 38 ++- .../rpc/TestOzoneRpcClientAbstract.java | 8 +- .../hadoop/ozone/om/TestOzoneManagerHA.java | 184 +++++++++--- .../apache/hadoop/ozone/om/OzoneManager.java | 6 + .../hadoop/ozone/om/ratis/OMRatisHelper.java | 9 +- .../om/ratis/OzoneManagerRatisClient.java | 27 +- ...ManagerProtocolServerSideTranslatorPB.java | 3 +- .../om/ratis/TestOzoneManagerRatisServer.java | 23 -- 21 files changed, 660 insertions(+), 345 deletions(-) delete mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java delete mode 100644 hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java rename hadoop-ozone/{client/src/main/java/org/apache/hadoop/ozone/client/rpc => common/src/main/java/org/apache/hadoop/ozone/om}/ha/package-info.java (94%) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cd40f7c091..0d73905cd7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -379,6 +379,23 @@ public final class OzoneConfigKeys { public static final String OZONE_FS_ISOLATED_CLASSLOADER = "ozone.fs.isolated-classloader"; + // Ozone Client Retry and Failover configurations + public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY = + "ozone.client.retry.max.attempts"; + public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT = + 10; + public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY = + "ozone.client.failover.max.attempts"; + public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT = + 15; + public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY = + "ozone.client.failover.sleep.base.millis"; + public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = + 500; + public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY = + "ozone.client.failover.sleep.max.millis"; + public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT = + 15000; public static final String OZONE_FREON_HTTP_ENABLED_KEY = "ozone.freon.http.enabled"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 45b46b843a..8e3b02a950 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -276,4 +276,7 @@ private OzoneConsts() { // Default OMServiceID for OM Ratis servers to use as RaftGroupId public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault"; + + // Dummy OMNodeID for OM Clients to use for a non-HA OM setup + public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy"; } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 8469fdce20..f7fecb7b67 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2029,4 +2029,45 @@ - + + ozone.client.retry.max.attempts + 10 + + Max retry attempts for Ozone RpcClient talking to OzoneManagers. + + + + ozone.client.failover.max.attempts + 15 + + Expert only. The number of client failover attempts that should be + made before the failover is considered failed. + + + + ozone.client.failover.sleep.base.millis + 500 + + Expert only. The time to wait, in milliseconds, between failover + attempts increases exponentially as a function of the number of + attempts made so far, with a random factor of +/- 50%. This option + specifies the base value used in the failover calculation. The + first failover will retry immediately. The 2nd failover attempt + will delay at least ozone.client.failover.sleep.base.millis + milliseconds. And so on. + + + + ozone.client.failover.sleep.max.millis + 15000 + + Expert only. The time to wait, in milliseconds, between failover + attempts increases exponentially as a function of the number of + attempts made so far, with a random factor of +/- 50%. This option + specifies the maximum value to wait between failovers. + Specifically, the time between two failover attempts will not + exceed +/- 50% of ozone.client.failover.sleep.max.millis + milliseconds. + + + \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index 494afae7e8..2bf90892c7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; @@ -510,5 +510,5 @@ void cancelDelegationToken(Token token) S3SecretValue getS3Secret(String kerberosID) throws IOException; @VisibleForTesting - OMProxyProvider getOMProxyProvider(); + OMFailoverProxyProvider getOMProxyProvider(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java index b69d9728bf..eea28091d3 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java @@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.response.BucketInfo; import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails; import org.apache.hadoop.ozone.client.rest.response.VolumeInfo; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; @@ -725,7 +725,7 @@ public S3SecretValue getS3Secret(String kerberosID) throws IOException { } @Override - public OMProxyProvider getOMProxyProvider() { + public OMFailoverProxyProvider getOMProxyProvider() { return null; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 0875046bda..7fab65041f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -66,7 +66,8 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; +import org.apache.hadoop.ozone.om.protocolPB + .OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.OMConfigKeys; @@ -85,6 +86,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.io.Text; import org.apache.logging.log4j.util.Strings; +import org.apache.ratis.protocol.ClientId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +109,6 @@ public class RpcClient implements ClientProtocol { private final OzoneConfiguration conf; private final StorageContainerLocationProtocol storageContainerLocationClient; - private final OMProxyProvider omProxyProvider; private final OzoneManagerProtocol ozoneManagerClient; private final XceiverClientManager xceiverClientManager; private final int chunkSize; @@ -121,6 +122,7 @@ public class RpcClient implements ClientProtocol { private final long streamBufferMaxSize; private final long blockSize; private final long watchTimeout; + private final ClientId clientId = ClientId.randomId(); /** * Creates RpcClient instance with the given configuration. @@ -135,13 +137,8 @@ public RpcClient(Configuration conf) throws IOException { OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT); this.groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS, OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT); - RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class, - ProtobufRpcEngine.class); - this.omProxyProvider = new OMProxyProvider(conf, ugi); - this.ozoneManagerClient = - TracingUtil.createProxy( - this.omProxyProvider.getProxy(), - OzoneManagerProtocol.class); + this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB( + this.conf, clientId.toString(), ugi); long scmVersion = RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); @@ -492,8 +489,8 @@ public S3SecretValue getS3Secret(String kerberosID) throws IOException { @Override @VisibleForTesting - public OMProxyProvider getOMProxyProvider() { - return omProxyProvider; + public OMFailoverProxyProvider getOMProxyProvider() { + return ozoneManagerClient.getOMFailoverProxyProvider(); } @Override diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java deleted file mode 100644 index 01e55620b9..0000000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.rpc.ha; - -import org.apache.hadoop.ozone.om.protocolPB - .OzoneManagerProtocolClientSideTranslatorPB; - -import java.net.InetSocketAddress; - -/** - * Proxy information of OM. - */ -public final class OMProxyInfo { - private InetSocketAddress address; - private OzoneManagerProtocolClientSideTranslatorPB omClient; - - public OMProxyInfo(InetSocketAddress addr) { - this.address = addr; - } - - public InetSocketAddress getAddress() { - return address; - } - - public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() { - return omClient; - } - - public void setOMProxy( - OzoneManagerProtocolClientSideTranslatorPB clientProxy) { - this.omClient = clientProxy; - } -} diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java deleted file mode 100644 index 574cb5f131..0000000000 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.client.rpc.ha; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.OmUtils; -import org.apache.hadoop.ozone.om.protocolPB - .OzoneManagerProtocolClientSideTranslatorPB; -import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ratis.protocol.ClientId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; - -/** - * A failover proxy provider implementation which allows clients to configure - * multiple OMs to connect to. In case of OM failover, client can try - * connecting to another OM node from the list of proxies. - */ -public class OMProxyProvider implements Closeable { - - public static final Logger LOG = - LoggerFactory.getLogger(OMProxyProvider.class); - - private List omProxies; - - private int currentProxyIndex = 0; - - private final Configuration conf; - private final long omVersion; - private final UserGroupInformation ugi; - private ClientId clientId = ClientId.randomId(); - - public OMProxyProvider(Configuration configuration, - UserGroupInformation ugi) { - this.conf = configuration; - this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class); - this.ugi = ugi; - loadOMClientConfigs(conf); - } - - private void loadOMClientConfigs(Configuration config) { - this.omProxies = new ArrayList<>(); - - Collection omServiceIds = config.getTrimmedStringCollection( - OZONE_OM_SERVICE_IDS_KEY); - - if (omServiceIds.size() > 1) { - throw new IllegalArgumentException("Multi-OM Services is not supported." + - " Please configure only one OM Service ID in " + - OZONE_OM_SERVICE_IDS_KEY); - } - - for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) { - Collection omNodeIds = OmUtils.getOMNodeIds(config, serviceId); - - for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) { - - String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, - serviceId, nodeId); - String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey); - if (rpcAddrStr == null) { - continue; - } - - InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr); - - // Add the OM client proxy info to list of proxies - if (addr != null) { - OMProxyInfo omProxyInfo = new OMProxyInfo(addr); - omProxies.add(omProxyInfo); - } else { - LOG.error("Failed to create OM proxy at address {}", rpcAddrStr); - } - } - } - - if (omProxies.isEmpty()) { - throw new IllegalArgumentException("Could not find any configured " + - "addresses for OM. Please configure the system with " - + OZONE_OM_ADDRESS_KEY); - } - } - - private OzoneManagerProtocolClientSideTranslatorPB getOMClient( - InetSocketAddress omAddress) throws IOException { - return new OzoneManagerProtocolClientSideTranslatorPB( - RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi, - conf, NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf)), clientId.toString()); - } - - /** - * Get the proxy object which should be used until the next failover event - * occurs. RPC proxy object is intialized lazily. - * @return the OM proxy object to invoke methods upon - */ - public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() { - OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex); - return createOMClientIfNeeded(currentOMProxyInfo); - } - - private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded( - OMProxyInfo proxyInfo) { - if (proxyInfo.getOMProxy() == null) { - try { - proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress())); - } catch (IOException ioe) { - LOG.error("{} Failed to create RPC proxy to OM at {}", - this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe); - throw new RuntimeException(ioe); - } - } - return proxyInfo.getOMProxy(); - } - - /** - * Called whenever an error warrants failing over. It is determined by the - * retry policy. - */ - public void performFailover() { - incrementProxyIndex(); - } - - synchronized void incrementProxyIndex() { - currentProxyIndex = (currentProxyIndex + 1) % omProxies.size(); - } - - /** - * Close all the proxy objects which have been opened over the lifetime of - * the proxy provider. - */ - @Override - public synchronized void close() throws IOException { - for (OMProxyInfo proxy : omProxies) { - OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy(); - if (omProxy != null) { - RPC.stopProxy(omProxy); - } - } - } - - @VisibleForTesting - public List getOMProxies() { - return omProxies; - } -} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java new file mode 100644 index 0000000000..5c1b39fc0a --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; + +/** + * A failover proxy provider implementation which allows clients to configure + * multiple OMs to connect to. In case of OM failover, client can try + * connecting to another OM node from the list of proxies. + */ +public class OMFailoverProxyProvider implements + FailoverProxyProvider, Closeable { + + public static final Logger LOG = + LoggerFactory.getLogger(OMFailoverProxyProvider.class); + + // Map of OMNodeID to its proxy + private Map omProxies; + private List omNodeIDList; + + private String currentProxyOMNodeId; + private int currentProxyIndex; + + private final Configuration conf; + private final long omVersion; + private final UserGroupInformation ugi; + + public OMFailoverProxyProvider(OzoneConfiguration configuration, + UserGroupInformation ugi) throws IOException { + this.conf = configuration; + this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class); + this.ugi = ugi; + loadOMClientConfigs(conf); + + currentProxyIndex = 0; + currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex); + } + + /** + * Class to store proxy information. + */ + public final class OMProxyInfo + extends FailoverProxyProvider.ProxyInfo { + private InetSocketAddress address; + + OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr, + InetSocketAddress addr) { + super(proxy, proxyInfoStr); + this.address = addr; + } + + public InetSocketAddress getAddress() { + return address; + } + } + + private void loadOMClientConfigs(Configuration config) throws IOException { + this.omProxies = new HashMap<>(); + this.omNodeIDList = new ArrayList<>(); + + Collection omServiceIds = config.getTrimmedStringCollection( + OZONE_OM_SERVICE_IDS_KEY); + + if (omServiceIds.size() > 1) { + throw new IllegalArgumentException("Multi-OM Services is not supported." + + " Please configure only one OM Service ID in " + + OZONE_OM_SERVICE_IDS_KEY); + } + + for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) { + Collection omNodeIds = OmUtils.getOMNodeIds(config, serviceId); + + for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) { + + String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, + serviceId, nodeId); + String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey); + if (rpcAddrStr == null) { + continue; + } + + InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr); + + // Add the OM client proxy info to list of proxies + if (addr != null) { + StringBuilder proxyInfo = new StringBuilder() + .append(nodeId).append("(") + .append(NetUtils.getHostPortString(addr)).append(")"); + OMProxyInfo omProxyInfo = new OMProxyInfo(null, + proxyInfo.toString(), addr); + + // For a non-HA OM setup, nodeId might be null. If so, we assign it + // a dummy value + if (nodeId == null) { + nodeId = OzoneConsts.OM_NODE_ID_DUMMY; + } + omProxies.put(nodeId, omProxyInfo); + omNodeIDList.add(nodeId); + + } else { + LOG.error("Failed to create OM proxy for {} at address {}", + nodeId, rpcAddrStr); + } + } + } + + if (omProxies.isEmpty()) { + throw new IllegalArgumentException("Could not find any configured " + + "addresses for OM. Please configure the system with " + + OZONE_OM_ADDRESS_KEY); + } + } + + @VisibleForTesting + public synchronized String getCurrentProxyOMNodeId() { + return currentProxyOMNodeId; + } + + private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress) + throws IOException { + RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class, + ProtobufRpcEngine.class); + return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi, + conf, NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf)); + } + + /** + * Get the proxy object which should be used until the next failover event + * occurs. RPC proxy object is intialized lazily. + * @return the OM proxy object to invoke methods upon + */ + @Override + public synchronized OMProxyInfo getProxy() { + OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId); + createOMProxyIfNeeded(currentOMProxyInfo); + return currentOMProxyInfo; + } + + /** + * Creates OM proxy object if it does not already exist. + */ + private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) { + if (proxyInfo.proxy == null) { + try { + proxyInfo.proxy = createOMProxy(proxyInfo.address); + } catch (IOException ioe) { + LOG.error("{} Failed to create RPC proxy to OM at {}", + this.getClass().getSimpleName(), proxyInfo.address, ioe); + throw new RuntimeException(ioe); + } + } + return proxyInfo; + } + + /** + * Called whenever an error warrants failing over. It is determined by the + * retry policy. + */ + @Override + public void performFailover(OzoneManagerProtocolPB currentProxy) { + int newProxyIndex = incrementProxyIndex(); + LOG.debug("Failing over OM proxy to index: {}, nodeId: {}", + newProxyIndex, omNodeIDList.get(newProxyIndex)); + } + + /** + * Update the proxy index to the next proxy in the list. + * @return the new proxy index + */ + private synchronized int incrementProxyIndex() { + currentProxyIndex = (currentProxyIndex + 1) % omProxies.size(); + currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex); + return currentProxyIndex; + } + + @Override + public Class getInterface() { + return OzoneManagerProtocolPB.class; + } + + /** + * Performs failover if the leaderOMNodeId returned through OMReponse does + * not match the current leaderOMNodeId cached by the proxy provider. + */ + public void performFailoverIfRequired(String newLeaderOMNodeId) { + if (updateLeaderOMNodeId(newLeaderOMNodeId)) { + LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId); + } + } + + /** + * Failover to the OM proxy specified by the new leader OMNodeId. + * @param newLeaderOMNodeId OMNodeId to failover to. + * @return true if failover is successful, false otherwise. + */ + synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) { + if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) { + if (omProxies.containsKey(newLeaderOMNodeId)) { + currentProxyOMNodeId = newLeaderOMNodeId; + currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId); + return true; + } + } + return false; + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * the proxy provider. + */ + @Override + public synchronized void close() throws IOException { + for (OMProxyInfo proxy : omProxies.values()) { + OzoneManagerProtocolPB omProxy = proxy.proxy; + if (omProxy != null) { + RPC.stopProxy(omProxy); + } + } + } + + @VisibleForTesting + public List getOMProxies() { + return new ArrayList<>(omProxies.values()); + } +} + diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java similarity index 94% rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java index df0e69c4d5..a95f09fa23 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.client.rpc.ha; +package org.apache.hadoop.ozone.om.ha; /** * This package contains Ozone Client's OM Proxy classes. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 157368287a..86fd62490c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -15,8 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.ozone.om.protocol; + import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; @@ -384,5 +387,11 @@ OmMultipartUploadListParts listParts(String volumeName, String bucketName, * @throws IOException */ S3SecretValue getS3Secret(String kerberosID) throws IOException; + + /** + * Get the OM Client's Retry and Failover Proxy provider. + * @return OMFailoverProxyProvider + */ + OMFailoverProxyProvider getOMFailoverProxyProvider(); } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 51ce94fba9..63a656c9f9 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -17,18 +17,26 @@ */ package org.apache.hadoop.ozone.om.protocolPB; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -103,6 +111,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; @@ -112,6 +121,9 @@ import com.google.common.base.Strings; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED; @@ -132,19 +144,90 @@ public final class OzoneManagerProtocolClientSideTranslatorPB */ private static final RpcController NULL_RPC_CONTROLLER = null; + private final OMFailoverProxyProvider omFailoverProxyProvider; private final OzoneManagerProtocolPB rpcProxy; private final String clientID; + private static final Logger FAILOVER_PROXY_PROVIDER_LOG = + LoggerFactory.getLogger(OMFailoverProxyProvider.class); + + public OzoneManagerProtocolClientSideTranslatorPB( + OzoneManagerProtocolPB proxy, String clientId) { + this.rpcProxy = proxy; + this.clientID = clientId; + this.omFailoverProxyProvider = null; + } /** - * Constructor for KeySpaceManger Client. - * @param rpcProxy + * Constructor for OM Protocol Client. This creates a {@link RetryProxy} + * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has + * one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the + * cluster. */ - public OzoneManagerProtocolClientSideTranslatorPB( - OzoneManagerProtocolPB rpcProxy, String clientId) { - this.rpcProxy = rpcProxy; + public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf, + String clientId, UserGroupInformation ugi) throws IOException { + this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi); + + int maxRetries = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, + OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT); + int maxFailovers = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + int sleepBase = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT); + int sleepMax = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY, + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT); + + this.rpcProxy = TracingUtil.createProxy( + createRetryProxy(omFailoverProxyProvider, maxRetries, maxFailovers, + sleepBase, sleepMax), + OzoneManagerProtocolPB.class); this.clientID = clientId; } + /** + * Creates a {@link RetryProxy} encapsulating the + * {@link OMFailoverProxyProvider}. The retry proxy fails over on network + * exception or if the current proxy is not the leader OM. + */ + private OzoneManagerProtocolPB createRetryProxy( + OMFailoverProxyProvider failoverProxyProvider, + int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) { + RetryPolicy retryPolicyOnNetworkException = RetryPolicies + .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, + maxFailovers, maxRetries, delayMillis, maxDelayBase); + RetryPolicy retryPolicy = new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception exception, int retries, + int failovers, boolean isIdempotentOrAtMostOnce) + throws Exception { + if (exception instanceof EOFException || + exception instanceof ServiceException) { + if (retries < maxRetries && failovers < maxFailovers) { + return RetryAction.FAILOVER_AND_RETRY; + } else { + FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " + + "Attempted {} retries and {} failovers", retries, failovers); + return RetryAction.FAIL; + } + } else { + return retryPolicyOnNetworkException.shouldRetry( + exception, retries, failovers, isIdempotentOrAtMostOnce); + } + } + }; + OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create( + OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy); + return proxy; + } + + @VisibleForTesting + public OMFailoverProxyProvider getOMFailoverProxyProvider() { + return omFailoverProxyProvider; + } + /** * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this @@ -196,7 +279,19 @@ private OMResponse submitRequest(OMRequest omRequest) OMRequest payload = OMRequest.newBuilder(omRequest) .setTraceID(TracingUtil.exportCurrentSpan()) .build(); - return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); + + OMResponse omResponse = + rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload); + + if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) { + String leaderOmId = omResponse.getLeaderOMNodeId(); + + // Failover to the OM node returned by OMReponse leaderOMNodeId if + // current proxy is not pointing to that node. + omFailoverProxyProvider.performFailoverIfRequired(leaderOmId); + } + + return omResponse; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index aaf3c859d5..b1168260c8 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -140,6 +140,8 @@ message OMResponse { required Status status = 5; + optional string leaderOMNodeId = 6; + optional CreateVolumeResponse createVolumeResponse = 11; optional SetVolumePropertyResponse setVolumePropertyResponse = 12; optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index a1ef1f6424..f84f95ea8e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -32,7 +32,9 @@ import java.io.IOException; import java.net.BindException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -48,6 +50,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { private static final Logger LOG = LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class); + private Map ozoneManagerMap; private List ozoneManagers; private static final Random RANDOM = new Random(); @@ -63,11 +66,12 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { private MiniOzoneHAClusterImpl( OzoneConfiguration conf, - List omList, + Map omMap, StorageContainerManager scm, List hddsDatanodes) { super(conf, scm, hddsDatanodes); - this.ozoneManagers = omList; + this.ozoneManagerMap = omMap; + this.ozoneManagers = new ArrayList<>(omMap.values()); } /** @@ -107,6 +111,10 @@ public void stopOzoneManager(int index) { ozoneManagers.get(index).stop(); } + public void stopOzoneManager(String omNodeId) { + ozoneManagerMap.get(omNodeId).stop(); + } + /** * Builder for configuring the MiniOzoneCluster to run. */ @@ -128,17 +136,17 @@ public MiniOzoneCluster build() throws IOException { DefaultMetricsSystem.setMiniClusterMode(true); initializeConfiguration(); StorageContainerManager scm; - List omList; + Map omMap; try { scm = createSCM(); scm.start(); - omList = createOMService(); + omMap = createOMService(); } catch (AuthenticationException ex) { throw new IOException("Unable to build MiniOzoneCluster. ", ex); } final List hddsDatanodes = createHddsDatanodes(scm); - MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList, + MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap, scm, hddsDatanodes); if (startDataNodes) { cluster.startHddsDatanodes(); @@ -171,10 +179,10 @@ void initializeConfiguration() throws IOException { * @throws IOException * @throws AuthenticationException */ - private List createOMService() throws IOException, + private Map createOMService() throws IOException, AuthenticationException { - List omList = new ArrayList<>(numOfOMs); + Map omMap = new HashMap<>(); int retryCount = 0; int basePort = 10000; @@ -186,10 +194,11 @@ private List createOMService() throws IOException, for (int i = 1; i<= numOfOMs; i++) { // Set nodeId - conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i); + String nodeId = nodeIdBaseStr + i; + conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId); // Set metadata/DB dir base path - String metaDirPath = path + "/" + nodeIdBaseStr + i; + String metaDirPath = path + "/" + nodeId; conf.set(OZONE_METADATA_DIRS, metaDirPath); OMStorage omStore = new OMStorage(conf); initializeOmStorage(omStore); @@ -201,7 +210,7 @@ private List createOMService() throws IOException, OzoneManager om = OzoneManager.createOm(null, conf); om.setCertClient(certClient); - omList.add(om); + omMap.put(nodeId, om); om.start(); LOG.info("Started OzoneManager RPC server at " + @@ -211,23 +220,24 @@ private List createOMService() throws IOException, // Set default OM address to point to the first OM. Clients would // try connecting to this address by default conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, - NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr())); + NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1) + .getOmRpcServerAddr())); break; } catch (BindException e) { - for (OzoneManager om : omList) { + for (OzoneManager om : omMap.values()) { om.stop(); om.join(); LOG.info("Stopping OzoneManager server at " + om.getOmRpcServerAddr()); } - omList.clear(); + omMap.clear(); ++retryCount; LOG.info("MiniOzoneHACluster port conflicts, retried " + retryCount + " times"); } } - return omList; + return omMap; } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 32792ae149..0828fe833c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -65,8 +65,6 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.rest.OzoneException; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; import org.apache.hadoop.ozone.common.OzoneChecksumException; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -76,6 +74,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -189,9 +188,10 @@ public static void setScmId(String scmId){ */ @Test public void testOMClientProxyProvider() { - OMProxyProvider omProxyProvider = store.getClientProxy() + OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy() .getOMProxyProvider(); - List omProxies = omProxyProvider.getOMProxies(); + List omProxies = + omFailoverProxyProvider.getOMProxies(); // For a non-HA OM service, there should be only one OM proxy. Assert.assertEquals(1, omProxies.size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 62cda916d1..da8f870650 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -18,30 +18,29 @@ import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo; -import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; -import org.apache.hadoop.ozone.web.handlers.VolumeArgs; -import org.apache.hadoop.ozone.web.interfaces.StorageHandler; -import org.apache.hadoop.ozone.web.response.VolumeInfo; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.VolumeArgs; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.List; import java.util.UUID; @@ -49,6 +48,14 @@ import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl .NODE_FAILURE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; @@ -58,8 +65,7 @@ public class TestOzoneManagerHA { private MiniOzoneHAClusterImpl cluster = null; - private StorageHandler storageHandler; - private UserArgs userArgs; + private ObjectStore objectStore; private OzoneConfiguration conf; private String clusterId; private String scmId; @@ -69,7 +75,7 @@ public class TestOzoneManagerHA { public ExpectedException exception = ExpectedException.none(); @Rule - public Timeout timeout = new Timeout(60_000); + public Timeout timeout = new Timeout(120_000); /** * Create a MiniDFSCluster for testing. @@ -85,6 +91,9 @@ public void init() throws Exception { scmId = UUID.randomUUID().toString(); conf.setBoolean(OZONE_ACL_ENABLED, true); conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); + conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3); + conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3); + conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50); cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) .setClusterId(clusterId) @@ -93,9 +102,7 @@ public void init() throws Exception { .setNumOfOzoneManagers(numOfOMs) .build(); cluster.waitForClusterToBeReady(); - storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); - userArgs = new UserArgs(null, OzoneUtils.getRequestID(), - null, null, null, null); + objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore(); } /** @@ -115,7 +122,7 @@ public void shutdown() { */ @Test public void testAllOMNodesRunning() throws Exception { - testCreateVolume(true); + createVolumeTest(true); } /** @@ -126,52 +133,56 @@ public void testOneOMNodeDown() throws Exception { cluster.stopOzoneManager(1); Thread.sleep(NODE_FAILURE_TIMEOUT * 2); - testCreateVolume(true); + createVolumeTest(true); } /** * Test client request fails when 2 OMs are down. */ @Test - @Ignore("TODO:HDDS-1158") public void testTwoOMNodesDown() throws Exception { cluster.stopOzoneManager(1); cluster.stopOzoneManager(2); Thread.sleep(NODE_FAILURE_TIMEOUT * 2); - testCreateVolume(false); + createVolumeTest(false); } /** * Create a volume and test its attribute. */ - private void testCreateVolume(boolean checkSuccess) throws Exception { + private void createVolumeTest(boolean checkSuccess) throws Exception { String userName = "user" + RandomStringUtils.randomNumeric(5); String adminName = "admin" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5); - VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); - createVolumeArgs.setUserName(userName); - createVolumeArgs.setAdminName(adminName); + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); try { - storageHandler.createVolume(createVolumeArgs); + objectStore.createVolume(volumeName, createVolumeArgs); - VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs); - VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); if (checkSuccess) { - Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName)); - Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName)); + Assert.assertTrue(retVolumeinfo.getName().equals(volumeName)); + Assert.assertTrue(retVolumeinfo.getOwner().equals(userName)); + Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName)); } else { // Verify that the request failed - Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty()); Assert.fail("There is no quorum. Request should have failed"); } - } catch (OMException e) { + } catch (ConnectException | RemoteException e) { if (!checkSuccess) { - GenericTestUtils.assertExceptionContains( - "RaftRetryFailureException", e); + // If the last OM to be tried by the RetryProxy is down, we would get + // ConnectException. Otherwise, we would get a RemoteException from the + // last running OM as it would fail to get a quorum. + if (e instanceof RemoteException) { + GenericTestUtils.assertExceptionContains( + "RaftRetryFailureException", e); + } } else { throw e; } @@ -179,14 +190,16 @@ private void testCreateVolume(boolean checkSuccess) throws Exception { } /** - * Test that OMProxyProvider creates an OM proxy for each OM in the cluster. + * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the + * cluster. */ @Test - public void testOMClientProxyProvide() throws Exception { + public void testOMProxyProviderInitialization() throws Exception { OzoneClient rpcClient = cluster.getRpcClient(); - OMProxyProvider omProxyProvider = + OMFailoverProxyProvider omFailoverProxyProvider = rpcClient.getObjectStore().getClientProxy().getOMProxyProvider(); - List omProxies = omProxyProvider.getOMProxies(); + List omProxies = + omFailoverProxyProvider.getOMProxies(); Assert.assertEquals(numOfOMs, omProxies.size()); @@ -194,7 +207,7 @@ public void testOMClientProxyProvide() throws Exception { InetSocketAddress omRpcServerAddr = cluster.getOzoneManager(i).getOmRpcServerAddr(); boolean omClientProxyExists = false; - for (OMProxyInfo omProxyInfo : omProxies) { + for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) { if (omProxyInfo.getAddress().equals(omRpcServerAddr)) { omClientProxyExists = true; break; @@ -205,4 +218,99 @@ public void testOMClientProxyProvide() throws Exception { omClientProxyExists); } } + + /** + * Test OMFailoverProxyProvider failover on connection exception to OM client. + */ + @Test + public void testOMProxyProviderFailoverOnConnectionFailure() + throws Exception { + OMFailoverProxyProvider omFailoverProxyProvider = + objectStore.getClientProxy().getOMProxyProvider(); + String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + createVolumeTest(true); + + // On stopping the current OM Proxy, the next connection attempt should + // failover to a another OM proxy. + cluster.stopOzoneManager(firstProxyNodeId); + Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4); + + // Next request to the proxy provider should result in a failover + createVolumeTest(true); + Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT); + + // Get the new OM Proxy NodeId + String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + // Verify that a failover occured. the new proxy nodeId should be + // different from the old proxy nodeId. + Assert.assertNotEquals("Failover did not occur as expected", + firstProxyNodeId, newProxyNodeId); + } + + /** + * Test OMFailoverProxyProvider failover when current OM proxy is not + * the current OM Leader. + */ + @Test + public void testOMProxyProviderFailoverToCurrentLeader() throws Exception { + OMFailoverProxyProvider omFailoverProxyProvider = + objectStore.getClientProxy().getOMProxyProvider(); + + // Run couple of createVolume tests to discover the current Leader OM + createVolumeTest(true); + createVolumeTest(true); + + // The OMFailoverProxyProvider will point to the current leader OM node. + String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + // Perform a manual failover of the proxy provider to move the + // currentProxyIndex to a node other than the leader OM. + omFailoverProxyProvider.performFailover( + omFailoverProxyProvider.getProxy().proxy); + + String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId(); + Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId); + + // Once another request is sent to this new proxy node, the leader + // information must be returned via the response and a failover must + // happen to the leader proxy node. + createVolumeTest(true); + Thread.sleep(2000); + + String newLeaderOMNodeId = + omFailoverProxyProvider.getCurrentProxyOMNodeId(); + + // The old and new Leader OM NodeId must match since there was no new + // election in the Ratis ring. + Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId); + } + + @Test + public void testOMRetryProxy() throws Exception { + // Stop all the OMs. After making 5 (set maxRetries value) attempts at + // connection, the RpcClient should give up. + for (int i = 0; i < numOfOMs; i++) { + cluster.stopOzoneManager(i); + } + + final LogVerificationAppender appender = new LogVerificationAppender(); + final org.apache.log4j.Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + + try { + createVolumeTest(true); + Assert.fail("TestOMRetryProxy should fail when there are no OMs running"); + } catch (ConnectException e) { + // Each retry attempt tries upto 10 times to connect. So there should be + // 3*10 "Retrying connect to server" messages + Assert.assertEquals(30, + appender.countLinesWithMessage("Retrying connect to server:")); + + Assert.assertEquals(1, + appender.countLinesWithMessage("Failed to connect to OM. Attempted " + + "3 retries and 3 failovers")); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 17552368bb..fc4ad01801 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -70,6 +70,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneSecurityUtil; +import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; @@ -2610,4 +2611,9 @@ public CertificateClient getCertificateClient() { public String getComponent() { return omComponent; } + + @Override + public OMFailoverProxyProvider getOMFailoverProxyProvider() { + return null; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java index 9115421d8e..8e4582d660 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java @@ -32,6 +32,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.retry.RetryPolicy; @@ -100,10 +101,12 @@ static Message convertResponseToMessage(OMResponse response) { return Message.valueOf(ByteString.copyFrom(requestBytes)); } - static OMResponse convertByteStringToOMResponse(ByteString byteString) + static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws InvalidProtocolBufferException { - byte[] bytes = byteString.toByteArray(); - return OMResponse.parseFrom(bytes); + byte[] bytes = reply.getMessage().getContent().toByteArray(); + return OMResponse.newBuilder(OMResponse.parseFrom(bytes)) + .setLeaderOMNodeId(reply.getReplierId()) + .build(); } static OMResponse getErrorResponse(Type cmdType, Exception e) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java index 9e1cafc30c..1b4c6347d9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java @@ -23,7 +23,10 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; + import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ServiceException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMConfigKeys; @@ -53,24 +56,24 @@ public final class OzoneManagerRatisClient implements Closeable { OzoneManagerRatisClient.class); private final RaftGroup raftGroup; - private final String omID; + private final String omNodeID; private final RpcType rpcType; private RaftClient raftClient; private final RetryPolicy retryPolicy; private final Configuration conf; - private OzoneManagerRatisClient(String omId, RaftGroup raftGroup, + private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup, RpcType rpcType, RetryPolicy retryPolicy, Configuration config) { this.raftGroup = raftGroup; - this.omID = omId; + this.omNodeID = omNodeId; this.rpcType = rpcType; this.retryPolicy = retryPolicy; this.conf = config; } public static OzoneManagerRatisClient newOzoneManagerRatisClient( - String omId, RaftGroup raftGroup, Configuration conf) { + String omNodeId, RaftGroup raftGroup, Configuration conf) { final String rpcType = conf.get( OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY, OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT); @@ -87,19 +90,19 @@ public static OzoneManagerRatisClient newOzoneManagerRatisClient( final RetryPolicy retryPolicy = RetryPolicies .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration); - return new OzoneManagerRatisClient(omId, raftGroup, + return new OzoneManagerRatisClient(omNodeId, raftGroup, SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf); } public void connect() { LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}", - raftGroup.getGroupId().getUuid().toString(), omID); + raftGroup.getGroupId().getUuid().toString(), omNodeID); // TODO : XceiverClient ratis should pass the config value of // maxOutstandingRequests so as to set the upper bound on max no of async // requests to be handled by raft client - raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup, + raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup, retryPolicy, conf); } @@ -119,13 +122,12 @@ public void close() { * @param request Request * @return Response to the command */ - public OMResponse sendCommand(OMRequest request) { + public OMResponse sendCommand(OMRequest request) throws ServiceException { try { CompletableFuture reply = sendCommandAsync(request); return reply.get(); } catch (ExecutionException | InterruptedException e) { - LOG.error("Failed to execute command: " + request, e); - return OMRatisHelper.getErrorResponse(request.getCmdType(), e); + throw new ServiceException(e); } } @@ -152,9 +154,10 @@ private CompletableFuture sendCommandAsync(OMRequest request) { if (raftRetryFailureException != null) { throw new CompletionException(raftRetryFailureException); } + OMResponse response = OMRatisHelper - .convertByteStringToOMResponse(reply.getMessage() - .getContent()); + .getOMResponseFromRaftClientReply(reply); + return response; } catch (InvalidProtocolBufferException e) { throw new CompletionException(e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 5684fa561b..2f1d64d894 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -80,7 +80,8 @@ public OMResponse submitRequest(RpcController controller, /** * Submits request to OM's Ratis server. */ - private OMResponse submitRequestToRatis(OMRequest request) { + private OMResponse submitRequestToRatis(OMRequest request) + throws ServiceException { return omRatisClient.sendCommand(request); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 83d2245b40..8a8be357c8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -34,8 +34,6 @@ import org.apache.hadoop.ozone.om.OMNodeDetails; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.protocol.RaftGroupId; @@ -109,27 +107,6 @@ public void testStartOMRatisServer() throws Exception { LifeCycle.State.RUNNING, omRatisServer.getServerState()); } - /** - * Submit any request to OM Ratis server and check that the dummy response - * message is received. - */ - @Test - public void testSubmitRatisRequest() throws Exception { - // Wait for leader election - Thread.sleep(LEADER_ELECTION_TIMEOUT * 2); - OMRequest request = OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume) - .setClientId(clientId) - .build(); - - OMResponse response = omRatisClient.sendCommand(request); - - Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume, - response.getCmdType()); - Assert.assertEquals(false, response.getSuccess()); - Assert.assertEquals(false, response.hasCreateVolumeResponse()); - } - /** * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are * categorized in {@link OmUtils#isReadOnly(OMRequest)}.