diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cd40f7c091..0d73905cd7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -379,6 +379,23 @@ public final class OzoneConfigKeys {
public static final String OZONE_FS_ISOLATED_CLASSLOADER =
"ozone.fs.isolated-classloader";
+ // Ozone Client Retry and Failover configurations
+ public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
+ "ozone.client.retry.max.attempts";
+ public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
+ 10;
+ public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
+ "ozone.client.failover.max.attempts";
+ public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
+ 15;
+ public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
+ "ozone.client.failover.sleep.base.millis";
+ public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
+ 500;
+ public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
+ "ozone.client.failover.sleep.max.millis";
+ public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
+ 15000;
public static final String OZONE_FREON_HTTP_ENABLED_KEY =
"ozone.freon.http.enabled";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 45b46b843a..8e3b02a950 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -276,4 +276,7 @@ private OzoneConsts() {
// Default OMServiceID for OM Ratis servers to use as RaftGroupId
public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
+
+ // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
+ public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8469fdce20..f7fecb7b67 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2029,4 +2029,45 @@
-
+
+ ozone.client.retry.max.attempts
+ 10
+
+ Max retry attempts for Ozone RpcClient talking to OzoneManagers.
+
+
+
+ ozone.client.failover.max.attempts
+ 15
+
+ Expert only. The number of client failover attempts that should be
+ made before the failover is considered failed.
+
+
+
+ ozone.client.failover.sleep.base.millis
+ 500
+
+ Expert only. The time to wait, in milliseconds, between failover
+ attempts increases exponentially as a function of the number of
+ attempts made so far, with a random factor of +/- 50%. This option
+ specifies the base value used in the failover calculation. The
+ first failover will retry immediately. The 2nd failover attempt
+ will delay at least ozone.client.failover.sleep.base.millis
+ milliseconds. And so on.
+
+
+
+ ozone.client.failover.sleep.max.millis
+ 15000
+
+ Expert only. The time to wait, in milliseconds, between failover
+ attempts increases exponentially as a function of the number of
+ attempts made so far, with a random factor of +/- 50%. This option
+ specifies the maximum value to wait between failovers.
+ Specifically, the time between two failover attempts will not
+ exceed +/- 50% of ozone.client.failover.sleep.max.millis
+ milliseconds.
+
+
+
\ No newline at end of file
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 494afae7e8..2bf90892c7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -510,5 +510,5 @@ void cancelDelegationToken(Token token)
S3SecretValue getS3Secret(String kerberosID) throws IOException;
@VisibleForTesting
- OMProxyProvider getOMProxyProvider();
+ OMFailoverProxyProvider getOMProxyProvider();
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index b69d9728bf..eea28091d3 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -42,8 +42,8 @@
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -725,7 +725,7 @@ public S3SecretValue getS3Secret(String kerberosID) throws IOException {
}
@Override
- public OMProxyProvider getOMProxyProvider() {
+ public OMFailoverProxyProvider getOMProxyProvider() {
return null;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 0875046bda..7fab65041f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,8 +50,8 @@
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -66,7 +66,8 @@
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.protocolPB
+ .OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -85,6 +86,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.logging.log4j.util.Strings;
+import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,7 +109,6 @@ public class RpcClient implements ClientProtocol {
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
- private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
@@ -121,6 +122,7 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferMaxSize;
private final long blockSize;
private final long watchTimeout;
+ private final ClientId clientId = ClientId.randomId();
/**
* Creates RpcClient instance with the given configuration.
@@ -135,13 +137,8 @@ public RpcClient(Configuration conf) throws IOException {
OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
this.groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
- RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
- ProtobufRpcEngine.class);
- this.omProxyProvider = new OMProxyProvider(conf, ugi);
- this.ozoneManagerClient =
- TracingUtil.createProxy(
- this.omProxyProvider.getProxy(),
- OzoneManagerProtocol.class);
+ this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
+ this.conf, clientId.toString(), ugi);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@@ -492,8 +489,8 @@ public S3SecretValue getS3Secret(String kerberosID) throws IOException {
@Override
@VisibleForTesting
- public OMProxyProvider getOMProxyProvider() {
- return omProxyProvider;
+ public OMFailoverProxyProvider getOMProxyProvider() {
+ return ozoneManagerClient.getOMFailoverProxyProvider();
}
@Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
deleted file mode 100644
index 01e55620b9..0000000000
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc.ha;
-
-import org.apache.hadoop.ozone.om.protocolPB
- .OzoneManagerProtocolClientSideTranslatorPB;
-
-import java.net.InetSocketAddress;
-
-/**
- * Proxy information of OM.
- */
-public final class OMProxyInfo {
- private InetSocketAddress address;
- private OzoneManagerProtocolClientSideTranslatorPB omClient;
-
- public OMProxyInfo(InetSocketAddress addr) {
- this.address = addr;
- }
-
- public InetSocketAddress getAddress() {
- return address;
- }
-
- public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
- return omClient;
- }
-
- public void setOMProxy(
- OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
- this.omClient = clientProxy;
- }
-}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
deleted file mode 100644
index 574cb5f131..0000000000
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc.ha;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.om.protocolPB
- .OzoneManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.protocol.ClientId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
-
-/**
- * A failover proxy provider implementation which allows clients to configure
- * multiple OMs to connect to. In case of OM failover, client can try
- * connecting to another OM node from the list of proxies.
- */
-public class OMProxyProvider implements Closeable {
-
- public static final Logger LOG =
- LoggerFactory.getLogger(OMProxyProvider.class);
-
- private List omProxies;
-
- private int currentProxyIndex = 0;
-
- private final Configuration conf;
- private final long omVersion;
- private final UserGroupInformation ugi;
- private ClientId clientId = ClientId.randomId();
-
- public OMProxyProvider(Configuration configuration,
- UserGroupInformation ugi) {
- this.conf = configuration;
- this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
- this.ugi = ugi;
- loadOMClientConfigs(conf);
- }
-
- private void loadOMClientConfigs(Configuration config) {
- this.omProxies = new ArrayList<>();
-
- Collection omServiceIds = config.getTrimmedStringCollection(
- OZONE_OM_SERVICE_IDS_KEY);
-
- if (omServiceIds.size() > 1) {
- throw new IllegalArgumentException("Multi-OM Services is not supported." +
- " Please configure only one OM Service ID in " +
- OZONE_OM_SERVICE_IDS_KEY);
- }
-
- for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
- Collection omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
-
- for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
- String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
- serviceId, nodeId);
- String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
- if (rpcAddrStr == null) {
- continue;
- }
-
- InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
-
- // Add the OM client proxy info to list of proxies
- if (addr != null) {
- OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
- omProxies.add(omProxyInfo);
- } else {
- LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
- }
- }
- }
-
- if (omProxies.isEmpty()) {
- throw new IllegalArgumentException("Could not find any configured " +
- "addresses for OM. Please configure the system with "
- + OZONE_OM_ADDRESS_KEY);
- }
- }
-
- private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
- InetSocketAddress omAddress) throws IOException {
- return new OzoneManagerProtocolClientSideTranslatorPB(
- RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
- conf, NetUtils.getDefaultSocketFactory(conf),
- Client.getRpcTimeout(conf)), clientId.toString());
- }
-
- /**
- * Get the proxy object which should be used until the next failover event
- * occurs. RPC proxy object is intialized lazily.
- * @return the OM proxy object to invoke methods upon
- */
- public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
- OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
- return createOMClientIfNeeded(currentOMProxyInfo);
- }
-
- private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
- OMProxyInfo proxyInfo) {
- if (proxyInfo.getOMProxy() == null) {
- try {
- proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
- } catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to OM at {}",
- this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
- throw new RuntimeException(ioe);
- }
- }
- return proxyInfo.getOMProxy();
- }
-
- /**
- * Called whenever an error warrants failing over. It is determined by the
- * retry policy.
- */
- public void performFailover() {
- incrementProxyIndex();
- }
-
- synchronized void incrementProxyIndex() {
- currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
- }
-
- /**
- * Close all the proxy objects which have been opened over the lifetime of
- * the proxy provider.
- */
- @Override
- public synchronized void close() throws IOException {
- for (OMProxyInfo proxy : omProxies) {
- OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
- if (omProxy != null) {
- RPC.stopProxy(omProxy);
- }
- }
- }
-
- @VisibleForTesting
- public List getOMProxies() {
- return omProxies;
- }
-}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
new file mode 100644
index 0000000000..5c1b39fc0a
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class OMFailoverProxyProvider implements
+ FailoverProxyProvider, Closeable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+
+ // Map of OMNodeID to its proxy
+ private Map omProxies;
+ private List omNodeIDList;
+
+ private String currentProxyOMNodeId;
+ private int currentProxyIndex;
+
+ private final Configuration conf;
+ private final long omVersion;
+ private final UserGroupInformation ugi;
+
+ public OMFailoverProxyProvider(OzoneConfiguration configuration,
+ UserGroupInformation ugi) throws IOException {
+ this.conf = configuration;
+ this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+ this.ugi = ugi;
+ loadOMClientConfigs(conf);
+
+ currentProxyIndex = 0;
+ currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+ }
+
+ /**
+ * Class to store proxy information.
+ */
+ public final class OMProxyInfo
+ extends FailoverProxyProvider.ProxyInfo {
+ private InetSocketAddress address;
+
+ OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
+ InetSocketAddress addr) {
+ super(proxy, proxyInfoStr);
+ this.address = addr;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+ }
+
+ private void loadOMClientConfigs(Configuration config) throws IOException {
+ this.omProxies = new HashMap<>();
+ this.omNodeIDList = new ArrayList<>();
+
+ Collection omServiceIds = config.getTrimmedStringCollection(
+ OZONE_OM_SERVICE_IDS_KEY);
+
+ if (omServiceIds.size() > 1) {
+ throw new IllegalArgumentException("Multi-OM Services is not supported." +
+ " Please configure only one OM Service ID in " +
+ OZONE_OM_SERVICE_IDS_KEY);
+ }
+
+ for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+ Collection omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
+
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+ String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ serviceId, nodeId);
+ String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
+
+ InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
+
+ // Add the OM client proxy info to list of proxies
+ if (addr != null) {
+ StringBuilder proxyInfo = new StringBuilder()
+ .append(nodeId).append("(")
+ .append(NetUtils.getHostPortString(addr)).append(")");
+ OMProxyInfo omProxyInfo = new OMProxyInfo(null,
+ proxyInfo.toString(), addr);
+
+ // For a non-HA OM setup, nodeId might be null. If so, we assign it
+ // a dummy value
+ if (nodeId == null) {
+ nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
+ }
+ omProxies.put(nodeId, omProxyInfo);
+ omNodeIDList.add(nodeId);
+
+ } else {
+ LOG.error("Failed to create OM proxy for {} at address {}",
+ nodeId, rpcAddrStr);
+ }
+ }
+ }
+
+ if (omProxies.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for OM. Please configure the system with "
+ + OZONE_OM_ADDRESS_KEY);
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized String getCurrentProxyOMNodeId() {
+ return currentProxyOMNodeId;
+ }
+
+ private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
+ throws IOException {
+ RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+ return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
+ conf, NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf));
+ }
+
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs. RPC proxy object is intialized lazily.
+ * @return the OM proxy object to invoke methods upon
+ */
+ @Override
+ public synchronized OMProxyInfo getProxy() {
+ OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
+ createOMProxyIfNeeded(currentOMProxyInfo);
+ return currentOMProxyInfo;
+ }
+
+ /**
+ * Creates OM proxy object if it does not already exist.
+ */
+ private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
+ if (proxyInfo.proxy == null) {
+ try {
+ proxyInfo.proxy = createOMProxy(proxyInfo.address);
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to OM at {}",
+ this.getClass().getSimpleName(), proxyInfo.address, ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ return proxyInfo;
+ }
+
+ /**
+ * Called whenever an error warrants failing over. It is determined by the
+ * retry policy.
+ */
+ @Override
+ public void performFailover(OzoneManagerProtocolPB currentProxy) {
+ int newProxyIndex = incrementProxyIndex();
+ LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
+ newProxyIndex, omNodeIDList.get(newProxyIndex));
+ }
+
+ /**
+ * Update the proxy index to the next proxy in the list.
+ * @return the new proxy index
+ */
+ private synchronized int incrementProxyIndex() {
+ currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
+ currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+ return currentProxyIndex;
+ }
+
+ @Override
+ public Class getInterface() {
+ return OzoneManagerProtocolPB.class;
+ }
+
+ /**
+ * Performs failover if the leaderOMNodeId returned through OMReponse does
+ * not match the current leaderOMNodeId cached by the proxy provider.
+ */
+ public void performFailoverIfRequired(String newLeaderOMNodeId) {
+ if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
+ LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+ }
+ }
+
+ /**
+ * Failover to the OM proxy specified by the new leader OMNodeId.
+ * @param newLeaderOMNodeId OMNodeId to failover to.
+ * @return true if failover is successful, false otherwise.
+ */
+ synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
+ if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
+ if (omProxies.containsKey(newLeaderOMNodeId)) {
+ currentProxyOMNodeId = newLeaderOMNodeId;
+ currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * the proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ for (OMProxyInfo proxy : omProxies.values()) {
+ OzoneManagerProtocolPB omProxy = proxy.proxy;
+ if (omProxy != null) {
+ RPC.stopProxy(omProxy);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public List getOMProxies() {
+ return new ArrayList<>(omProxies.values());
+ }
+}
+
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
similarity index 94%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
index df0e69c4d5..a95f09fa23 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.ozone.client.rpc.ha;
+package org.apache.hadoop.ozone.om.ha;
/**
* This package contains Ozone Client's OM Proxy classes.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 157368287a..86fd62490c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.ozone.om.protocol;
+
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -384,5 +387,11 @@ OmMultipartUploadListParts listParts(String volumeName, String bucketName,
* @throws IOException
*/
S3SecretValue getS3Secret(String kerberosID) throws IOException;
+
+ /**
+ * Get the OM Client's Retry and Failover Proxy provider.
+ * @return OMFailoverProxyProvider
+ */
+ OMFailoverProxyProvider getOMFailoverProxyProvider();
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 51ce94fba9..63a656c9f9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -17,18 +17,26 @@
*/
package org.apache.hadoop.ozone.om.protocolPB;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -103,6 +111,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@@ -112,6 +121,9 @@
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@@ -132,19 +144,90 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
+ private final OMFailoverProxyProvider omFailoverProxyProvider;
private final OzoneManagerProtocolPB rpcProxy;
private final String clientID;
+ private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
+ LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+
+ public OzoneManagerProtocolClientSideTranslatorPB(
+ OzoneManagerProtocolPB proxy, String clientId) {
+ this.rpcProxy = proxy;
+ this.clientID = clientId;
+ this.omFailoverProxyProvider = null;
+ }
/**
- * Constructor for KeySpaceManger Client.
- * @param rpcProxy
+ * Constructor for OM Protocol Client. This creates a {@link RetryProxy}
+ * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
+ * one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
+ * cluster.
*/
- public OzoneManagerProtocolClientSideTranslatorPB(
- OzoneManagerProtocolPB rpcProxy, String clientId) {
- this.rpcProxy = rpcProxy;
+ public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
+ String clientId, UserGroupInformation ugi) throws IOException {
+ this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
+
+ int maxRetries = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
+ int maxFailovers = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+ int sleepBase = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
+ int sleepMax = conf.getInt(
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
+ OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
+
+ this.rpcProxy = TracingUtil.createProxy(
+ createRetryProxy(omFailoverProxyProvider, maxRetries, maxFailovers,
+ sleepBase, sleepMax),
+ OzoneManagerProtocolPB.class);
this.clientID = clientId;
}
+ /**
+ * Creates a {@link RetryProxy} encapsulating the
+ * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
+ * exception or if the current proxy is not the leader OM.
+ */
+ private OzoneManagerProtocolPB createRetryProxy(
+ OMFailoverProxyProvider failoverProxyProvider,
+ int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
+ RetryPolicy retryPolicyOnNetworkException = RetryPolicies
+ .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ maxFailovers, maxRetries, delayMillis, maxDelayBase);
+ RetryPolicy retryPolicy = new RetryPolicy() {
+ @Override
+ public RetryAction shouldRetry(Exception exception, int retries,
+ int failovers, boolean isIdempotentOrAtMostOnce)
+ throws Exception {
+ if (exception instanceof EOFException ||
+ exception instanceof ServiceException) {
+ if (retries < maxRetries && failovers < maxFailovers) {
+ return RetryAction.FAILOVER_AND_RETRY;
+ } else {
+ FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
+ "Attempted {} retries and {} failovers", retries, failovers);
+ return RetryAction.FAIL;
+ }
+ } else {
+ return retryPolicyOnNetworkException.shouldRetry(
+ exception, retries, failovers, isIdempotentOrAtMostOnce);
+ }
+ }
+ };
+ OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
+ OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
+ return proxy;
+ }
+
+ @VisibleForTesting
+ public OMFailoverProxyProvider getOMFailoverProxyProvider() {
+ return omFailoverProxyProvider;
+ }
+
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
@@ -196,7 +279,19 @@ private OMResponse submitRequest(OMRequest omRequest)
OMRequest payload = OMRequest.newBuilder(omRequest)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
- return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
+
+ OMResponse omResponse =
+ rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
+
+ if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
+ String leaderOmId = omResponse.getLeaderOMNodeId();
+
+ // Failover to the OM node returned by OMReponse leaderOMNodeId if
+ // current proxy is not pointing to that node.
+ omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+ }
+
+ return omResponse;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index aaf3c859d5..b1168260c8 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -140,6 +140,8 @@ message OMResponse {
required Status status = 5;
+ optional string leaderOMNodeId = 6;
+
optional CreateVolumeResponse createVolumeResponse = 11;
optional SetVolumePropertyResponse setVolumePropertyResponse = 12;
optional CheckVolumeAccessResponse checkVolumeAccessResponse = 13;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index a1ef1f6424..f84f95ea8e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -32,7 +32,9 @@
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -48,6 +50,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
+ private Map ozoneManagerMap;
private List ozoneManagers;
private static final Random RANDOM = new Random();
@@ -63,11 +66,12 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
private MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
- List omList,
+ Map omMap,
StorageContainerManager scm,
List hddsDatanodes) {
super(conf, scm, hddsDatanodes);
- this.ozoneManagers = omList;
+ this.ozoneManagerMap = omMap;
+ this.ozoneManagers = new ArrayList<>(omMap.values());
}
/**
@@ -107,6 +111,10 @@ public void stopOzoneManager(int index) {
ozoneManagers.get(index).stop();
}
+ public void stopOzoneManager(String omNodeId) {
+ ozoneManagerMap.get(omNodeId).stop();
+ }
+
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@@ -128,17 +136,17 @@ public MiniOzoneCluster build() throws IOException {
DefaultMetricsSystem.setMiniClusterMode(true);
initializeConfiguration();
StorageContainerManager scm;
- List omList;
+ Map omMap;
try {
scm = createSCM();
scm.start();
- omList = createOMService();
+ omMap = createOMService();
} catch (AuthenticationException ex) {
throw new IOException("Unable to build MiniOzoneCluster. ", ex);
}
final List hddsDatanodes = createHddsDatanodes(scm);
- MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
+ MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
scm, hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
@@ -171,10 +179,10 @@ void initializeConfiguration() throws IOException {
* @throws IOException
* @throws AuthenticationException
*/
- private List createOMService() throws IOException,
+ private Map createOMService() throws IOException,
AuthenticationException {
- List omList = new ArrayList<>(numOfOMs);
+ Map omMap = new HashMap<>();
int retryCount = 0;
int basePort = 10000;
@@ -186,10 +194,11 @@ private List createOMService() throws IOException,
for (int i = 1; i<= numOfOMs; i++) {
// Set nodeId
- conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
+ String nodeId = nodeIdBaseStr + i;
+ conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
// Set metadata/DB dir base path
- String metaDirPath = path + "/" + nodeIdBaseStr + i;
+ String metaDirPath = path + "/" + nodeId;
conf.set(OZONE_METADATA_DIRS, metaDirPath);
OMStorage omStore = new OMStorage(conf);
initializeOmStorage(omStore);
@@ -201,7 +210,7 @@ private List createOMService() throws IOException,
OzoneManager om = OzoneManager.createOm(null, conf);
om.setCertClient(certClient);
- omList.add(om);
+ omMap.put(nodeId, om);
om.start();
LOG.info("Started OzoneManager RPC server at " +
@@ -211,23 +220,24 @@ private List createOMService() throws IOException,
// Set default OM address to point to the first OM. Clients would
// try connecting to this address by default
conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
- NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+ NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
+ .getOmRpcServerAddr()));
break;
} catch (BindException e) {
- for (OzoneManager om : omList) {
+ for (OzoneManager om : omMap.values()) {
om.stop();
om.join();
LOG.info("Stopping OzoneManager server at " +
om.getOmRpcServerAddr());
}
- omList.clear();
+ omMap.clear();
++retryCount;
LOG.info("MiniOzoneHACluster port conflicts, retried " +
retryCount + " times");
}
}
- return omList;
+ return omMap;
}
/**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 32792ae149..0828fe833c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -65,8 +65,6 @@
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -76,6 +74,7 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -189,9 +188,10 @@ public static void setScmId(String scmId){
*/
@Test
public void testOMClientProxyProvider() {
- OMProxyProvider omProxyProvider = store.getClientProxy()
+ OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
.getOMProxyProvider();
- List omProxies = omProxyProvider.getOMProxies();
+ List omProxies =
+ omFailoverProxyProvider.getOMProxies();
// For a non-HA OM service, there should be only one OM proxy.
Assert.assertEquals(1, omProxies.size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 62cda916d1..da8f870650 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,30 +18,29 @@
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.response.VolumeInfo;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
@@ -49,6 +48,14 @@
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
.NODE_FAILURE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@@ -58,8 +65,7 @@
public class TestOzoneManagerHA {
private MiniOzoneHAClusterImpl cluster = null;
- private StorageHandler storageHandler;
- private UserArgs userArgs;
+ private ObjectStore objectStore;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
@@ -69,7 +75,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();
@Rule
- public Timeout timeout = new Timeout(60_000);
+ public Timeout timeout = new Timeout(120_000);
/**
* Create a MiniDFSCluster for testing.
@@ -85,6 +91,9 @@ public void init() throws Exception {
scmId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+ conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
+ conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
+ conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
@@ -93,9 +102,7 @@ public void init() throws Exception {
.setNumOfOzoneManagers(numOfOMs)
.build();
cluster.waitForClusterToBeReady();
- storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
- userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
- null, null, null, null);
+ objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
}
/**
@@ -115,7 +122,7 @@ public void shutdown() {
*/
@Test
public void testAllOMNodesRunning() throws Exception {
- testCreateVolume(true);
+ createVolumeTest(true);
}
/**
@@ -126,52 +133,56 @@ public void testOneOMNodeDown() throws Exception {
cluster.stopOzoneManager(1);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
- testCreateVolume(true);
+ createVolumeTest(true);
}
/**
* Test client request fails when 2 OMs are down.
*/
@Test
- @Ignore("TODO:HDDS-1158")
public void testTwoOMNodesDown() throws Exception {
cluster.stopOzoneManager(1);
cluster.stopOzoneManager(2);
Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
- testCreateVolume(false);
+ createVolumeTest(false);
}
/**
* Create a volume and test its attribute.
*/
- private void testCreateVolume(boolean checkSuccess) throws Exception {
+ private void createVolumeTest(boolean checkSuccess) throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
- VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
- createVolumeArgs.setUserName(userName);
- createVolumeArgs.setAdminName(adminName);
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
try {
- storageHandler.createVolume(createVolumeArgs);
+ objectStore.createVolume(volumeName, createVolumeArgs);
- VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
- VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
if (checkSuccess) {
- Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
- Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+ Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+ Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
} else {
// Verify that the request failed
- Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
Assert.fail("There is no quorum. Request should have failed");
}
- } catch (OMException e) {
+ } catch (ConnectException | RemoteException e) {
if (!checkSuccess) {
- GenericTestUtils.assertExceptionContains(
- "RaftRetryFailureException", e);
+ // If the last OM to be tried by the RetryProxy is down, we would get
+ // ConnectException. Otherwise, we would get a RemoteException from the
+ // last running OM as it would fail to get a quorum.
+ if (e instanceof RemoteException) {
+ GenericTestUtils.assertExceptionContains(
+ "RaftRetryFailureException", e);
+ }
} else {
throw e;
}
@@ -179,14 +190,16 @@ private void testCreateVolume(boolean checkSuccess) throws Exception {
}
/**
- * Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
+ * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
+ * cluster.
*/
@Test
- public void testOMClientProxyProvide() throws Exception {
+ public void testOMProxyProviderInitialization() throws Exception {
OzoneClient rpcClient = cluster.getRpcClient();
- OMProxyProvider omProxyProvider =
+ OMFailoverProxyProvider omFailoverProxyProvider =
rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
- List omProxies = omProxyProvider.getOMProxies();
+ List omProxies =
+ omFailoverProxyProvider.getOMProxies();
Assert.assertEquals(numOfOMs, omProxies.size());
@@ -194,7 +207,7 @@ public void testOMClientProxyProvide() throws Exception {
InetSocketAddress omRpcServerAddr =
cluster.getOzoneManager(i).getOmRpcServerAddr();
boolean omClientProxyExists = false;
- for (OMProxyInfo omProxyInfo : omProxies) {
+ for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
omClientProxyExists = true;
break;
@@ -205,4 +218,99 @@ public void testOMClientProxyProvide() throws Exception {
omClientProxyExists);
}
}
+
+ /**
+ * Test OMFailoverProxyProvider failover on connection exception to OM client.
+ */
+ @Test
+ public void testOMProxyProviderFailoverOnConnectionFailure()
+ throws Exception {
+ OMFailoverProxyProvider omFailoverProxyProvider =
+ objectStore.getClientProxy().getOMProxyProvider();
+ String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ createVolumeTest(true);
+
+ // On stopping the current OM Proxy, the next connection attempt should
+ // failover to a another OM proxy.
+ cluster.stopOzoneManager(firstProxyNodeId);
+ Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
+
+ // Next request to the proxy provider should result in a failover
+ createVolumeTest(true);
+ Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
+
+ // Get the new OM Proxy NodeId
+ String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Verify that a failover occured. the new proxy nodeId should be
+ // different from the old proxy nodeId.
+ Assert.assertNotEquals("Failover did not occur as expected",
+ firstProxyNodeId, newProxyNodeId);
+ }
+
+ /**
+ * Test OMFailoverProxyProvider failover when current OM proxy is not
+ * the current OM Leader.
+ */
+ @Test
+ public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
+ OMFailoverProxyProvider omFailoverProxyProvider =
+ objectStore.getClientProxy().getOMProxyProvider();
+
+ // Run couple of createVolume tests to discover the current Leader OM
+ createVolumeTest(true);
+ createVolumeTest(true);
+
+ // The OMFailoverProxyProvider will point to the current leader OM node.
+ String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // Perform a manual failover of the proxy provider to move the
+ // currentProxyIndex to a node other than the leader OM.
+ omFailoverProxyProvider.performFailover(
+ omFailoverProxyProvider.getProxy().proxy);
+
+ String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+ Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
+
+ // Once another request is sent to this new proxy node, the leader
+ // information must be returned via the response and a failover must
+ // happen to the leader proxy node.
+ createVolumeTest(true);
+ Thread.sleep(2000);
+
+ String newLeaderOMNodeId =
+ omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+ // The old and new Leader OM NodeId must match since there was no new
+ // election in the Ratis ring.
+ Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
+ }
+
+ @Test
+ public void testOMRetryProxy() throws Exception {
+ // Stop all the OMs. After making 5 (set maxRetries value) attempts at
+ // connection, the RpcClient should give up.
+ for (int i = 0; i < numOfOMs; i++) {
+ cluster.stopOzoneManager(i);
+ }
+
+ final LogVerificationAppender appender = new LogVerificationAppender();
+ final org.apache.log4j.Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+
+ try {
+ createVolumeTest(true);
+ Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
+ } catch (ConnectException e) {
+ // Each retry attempt tries upto 10 times to connect. So there should be
+ // 3*10 "Retrying connect to server" messages
+ Assert.assertEquals(30,
+ appender.countLinesWithMessage("Retrying connect to server:"));
+
+ Assert.assertEquals(1,
+ appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
+ "3 retries and 3 failovers"));
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 17552368bb..fc4ad01801 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -2610,4 +2611,9 @@ public CertificateClient getCertificateClient() {
public String getComponent() {
return omComponent;
}
+
+ @Override
+ public OMFailoverProxyProvider getOMFailoverProxyProvider() {
+ return null;
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
index 9115421d8e..8e4582d660 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
@@ -32,6 +32,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
@@ -100,10 +101,12 @@ static Message convertResponseToMessage(OMResponse response) {
return Message.valueOf(ByteString.copyFrom(requestBytes));
}
- static OMResponse convertByteStringToOMResponse(ByteString byteString)
+ static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
throws InvalidProtocolBufferException {
- byte[] bytes = byteString.toByteArray();
- return OMResponse.parseFrom(bytes);
+ byte[] bytes = reply.getMessage().getContent().toByteArray();
+ return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
+ .setLeaderOMNodeId(reply.getReplierId())
+ .build();
}
static OMResponse getErrorResponse(Type cmdType, Exception e) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 9e1cafc30c..1b4c6347d9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -23,7 +23,10 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ServiceException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -53,24 +56,24 @@ public final class OzoneManagerRatisClient implements Closeable {
OzoneManagerRatisClient.class);
private final RaftGroup raftGroup;
- private final String omID;
+ private final String omNodeID;
private final RpcType rpcType;
private RaftClient raftClient;
private final RetryPolicy retryPolicy;
private final Configuration conf;
- private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
+ private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
RpcType rpcType, RetryPolicy retryPolicy,
Configuration config) {
this.raftGroup = raftGroup;
- this.omID = omId;
+ this.omNodeID = omNodeId;
this.rpcType = rpcType;
this.retryPolicy = retryPolicy;
this.conf = config;
}
public static OzoneManagerRatisClient newOzoneManagerRatisClient(
- String omId, RaftGroup raftGroup, Configuration conf) {
+ String omNodeId, RaftGroup raftGroup, Configuration conf) {
final String rpcType = conf.get(
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@@ -87,19 +90,19 @@ public static OzoneManagerRatisClient newOzoneManagerRatisClient(
final RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
- return new OzoneManagerRatisClient(omId, raftGroup,
+ return new OzoneManagerRatisClient(omNodeId, raftGroup,
SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
}
public void connect() {
LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
- raftGroup.getGroupId().getUuid().toString(), omID);
+ raftGroup.getGroupId().getUuid().toString(), omNodeID);
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
- raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
+ raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
retryPolicy, conf);
}
@@ -119,13 +122,12 @@ public void close() {
* @param request Request
* @return Response to the command
*/
- public OMResponse sendCommand(OMRequest request) {
+ public OMResponse sendCommand(OMRequest request) throws ServiceException {
try {
CompletableFuture reply = sendCommandAsync(request);
return reply.get();
} catch (ExecutionException | InterruptedException e) {
- LOG.error("Failed to execute command: " + request, e);
- return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
+ throw new ServiceException(e);
}
}
@@ -152,9 +154,10 @@ private CompletableFuture sendCommandAsync(OMRequest request) {
if (raftRetryFailureException != null) {
throw new CompletionException(raftRetryFailureException);
}
+
OMResponse response = OMRatisHelper
- .convertByteStringToOMResponse(reply.getMessage()
- .getContent());
+ .getOMResponseFromRaftClientReply(reply);
+
return response;
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 5684fa561b..2f1d64d894 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -80,7 +80,8 @@ public OMResponse submitRequest(RpcController controller,
/**
* Submits request to OM's Ratis server.
*/
- private OMResponse submitRequestToRatis(OMRequest request) {
+ private OMResponse submitRequestToRatis(OMRequest request)
+ throws ServiceException {
return omRatisClient.sendCommand(request);
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 83d2245b40..8a8be357c8 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -34,8 +34,6 @@
import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
@@ -109,27 +107,6 @@ public void testStartOMRatisServer() throws Exception {
LifeCycle.State.RUNNING, omRatisServer.getServerState());
}
- /**
- * Submit any request to OM Ratis server and check that the dummy response
- * message is received.
- */
- @Test
- public void testSubmitRatisRequest() throws Exception {
- // Wait for leader election
- Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
- OMRequest request = OMRequest.newBuilder()
- .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
- .setClientId(clientId)
- .build();
-
- OMResponse response = omRatisClient.sendCommand(request);
-
- Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
- response.getCmdType());
- Assert.assertEquals(false, response.getSuccess());
- Assert.assertEquals(false, response.hasCreateVolumeResponse());
- }
-
/**
* Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
* categorized in {@link OmUtils#isReadOnly(OMRequest)}.