HDDS-2162. Make OM Generic related configuration support HA style config. (#1511)
This commit is contained in:
parent
685918ef41
commit
169cef758d
@ -404,18 +404,14 @@ private static void addFilesToArchive(String source, File file,
|
||||
/**
|
||||
* If a OM conf is only set with key suffixed with OM Node ID, return the
|
||||
* set value.
|
||||
* @return null if base conf key is set, otherwise the value set for
|
||||
* key suffixed with Node ID.
|
||||
* @return if the value is set for key suffixed with OM Node ID, return the
|
||||
* value, else return null.
|
||||
*/
|
||||
public static String getConfSuffixedWithOMNodeId(Configuration conf,
|
||||
String confKey, String omServiceID, String omNodeId) {
|
||||
String confValue = conf.getTrimmed(confKey);
|
||||
if (StringUtils.isNotEmpty(confValue)) {
|
||||
return null;
|
||||
}
|
||||
String suffixedConfKey = OmUtils.addKeySuffixes(
|
||||
confKey, omServiceID, omNodeId);
|
||||
confValue = conf.getTrimmed(suffixedConfKey);
|
||||
String confValue = conf.getTrimmed(suffixedConfKey);
|
||||
if (StringUtils.isNotEmpty(confValue)) {
|
||||
return confValue;
|
||||
}
|
||||
|
@ -119,10 +119,13 @@ public void testDefaultPortIfNotSpecified() throws Exception {
|
||||
String omNode1Id = "omNode1";
|
||||
String omNode2Id = "omNode2";
|
||||
String omNodesKeyValue = omNode1Id + "," + omNode2Id;
|
||||
conf.set(OMConfigKeys.OZONE_OM_NODES_KEY, omNodesKeyValue);
|
||||
String serviceID = "service1";
|
||||
conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, serviceID);
|
||||
conf.set(OMConfigKeys.OZONE_OM_NODES_KEY + "." + serviceID,
|
||||
omNodesKeyValue);
|
||||
|
||||
String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(null, omNode1Id);
|
||||
String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(null, omNode2Id);
|
||||
String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(serviceID, omNode1Id);
|
||||
String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(serviceID, omNode2Id);
|
||||
|
||||
conf.set(omNode1RpcAddrKey, "0.0.0.0");
|
||||
conf.set(omNode2RpcAddrKey, "122.0.0.122");
|
||||
|
@ -42,7 +42,6 @@
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
@ -74,9 +73,10 @@
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
|
||||
import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
||||
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
|
||||
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
|
||||
import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
|
||||
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
|
||||
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
|
||||
@ -207,10 +207,6 @@
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
|
||||
@ -310,12 +306,32 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
|
||||
super(OzoneVersionInfo.OZONE_VERSION_INFO);
|
||||
Preconditions.checkNotNull(conf);
|
||||
configuration = conf;
|
||||
// Load HA related configurations
|
||||
OMHANodeDetails omhaNodeDetails =
|
||||
OMHANodeDetails.loadOMHAConfig(configuration);
|
||||
|
||||
this.peerNodes = omhaNodeDetails.getPeerNodeDetails();
|
||||
this.omNodeDetails = omhaNodeDetails.getLocalNodeDetails();
|
||||
|
||||
omStorage = new OMStorage(conf);
|
||||
omId = omStorage.getOmId();
|
||||
|
||||
// In case of single OM Node Service there will be no OM Node ID
|
||||
// specified, set it to value from om storage
|
||||
if (this.omNodeDetails.getOMNodeId() == null) {
|
||||
this.omNodeDetails =
|
||||
OMHANodeDetails.getOMNodeDetails(conf, omNodeDetails.getOMServiceId(),
|
||||
omStorage.getOmId(), omNodeDetails.getRpcAddress(),
|
||||
omNodeDetails.getRatisPort());
|
||||
}
|
||||
|
||||
loginOMUserIfSecurityEnabled(conf);
|
||||
|
||||
this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
|
||||
OZONE_OM_USER_MAX_VOLUME_DEFAULT);
|
||||
Preconditions.checkArgument(this.maxUserVolumeCount > 0,
|
||||
OZONE_OM_USER_MAX_VOLUME + " value should be greater than zero");
|
||||
omStorage = new OMStorage(conf);
|
||||
omId = omStorage.getOmId();
|
||||
|
||||
if (omStorage.getState() != StorageState.INITIALIZED) {
|
||||
throw new OMException("OM not initialized.",
|
||||
ResultCodes.OM_NOT_INITIALIZED);
|
||||
@ -342,8 +358,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
|
||||
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
|
||||
|
||||
// Load HA related configurations
|
||||
loadOMHAConfigs(configuration);
|
||||
|
||||
InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
|
||||
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
|
||||
|
||||
@ -420,7 +435,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException,
|
||||
OzoneManagerProtocolProtos.Type.values());
|
||||
|
||||
// Start Om Rpc Server.
|
||||
omRpcServer = getRpcServer(conf);
|
||||
omRpcServer = getRpcServer(configuration);
|
||||
omRpcAddress = updateRPCListenAddress(configuration,
|
||||
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
|
||||
|
||||
@ -513,195 +528,6 @@ public boolean isGrpcBlockTokenEnabled() {
|
||||
return grpcBlockTokenEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inspects and loads OM node configurations.
|
||||
*
|
||||
* If {@link OMConfigKeys#OZONE_OM_SERVICE_IDS_KEY} is configured with
|
||||
* multiple ids and/ or if {@link OMConfigKeys#OZONE_OM_NODE_ID_KEY} is not
|
||||
* specifically configured , this method determines the omServiceId
|
||||
* and omNodeId by matching the node's address with the configured
|
||||
* addresses. When a match is found, it sets the omServicId and omNodeId from
|
||||
* the corresponding configuration key. This method also finds the OM peers
|
||||
* nodes belonging to the same OM service.
|
||||
*
|
||||
* @param conf
|
||||
*/
|
||||
private void loadOMHAConfigs(Configuration conf) {
|
||||
InetSocketAddress localRpcAddress = null;
|
||||
String localOMServiceId = null;
|
||||
String localOMNodeId = null;
|
||||
int localRatisPort = 0;
|
||||
Collection<String> omServiceIds = conf.getTrimmedStringCollection(
|
||||
OZONE_OM_SERVICE_IDS_KEY);
|
||||
|
||||
String knownOMNodeId = conf.get(OZONE_OM_NODE_ID_KEY);
|
||||
int found = 0;
|
||||
boolean isOMAddressSet = false;
|
||||
|
||||
for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
|
||||
Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, serviceId);
|
||||
|
||||
List<OMNodeDetails> peerNodesList = new ArrayList<>();
|
||||
boolean isPeer = false;
|
||||
for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
|
||||
if (knownOMNodeId != null && !knownOMNodeId.equals(nodeId)) {
|
||||
isPeer = true;
|
||||
} else {
|
||||
isPeer = false;
|
||||
}
|
||||
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
|
||||
serviceId, nodeId);
|
||||
String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
|
||||
if (rpcAddrStr == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If OM address is set for any node id, we will not fallback to the
|
||||
// default
|
||||
isOMAddressSet = true;
|
||||
|
||||
String ratisPortKey = OmUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
|
||||
serviceId, nodeId);
|
||||
int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
|
||||
|
||||
InetSocketAddress addr = null;
|
||||
try {
|
||||
addr = NetUtils.createSocketAddr(rpcAddrStr);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception in creating socket address " + addr, e);
|
||||
continue;
|
||||
}
|
||||
if (!addr.isUnresolved()) {
|
||||
if (!isPeer && OmUtils.isAddressLocal(addr)) {
|
||||
localRpcAddress = addr;
|
||||
localOMServiceId = serviceId;
|
||||
localOMNodeId = nodeId;
|
||||
localRatisPort = ratisPort;
|
||||
found++;
|
||||
} else {
|
||||
// This OMNode belongs to same OM service as the current OMNode.
|
||||
// Add it to peerNodes list.
|
||||
String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
|
||||
serviceId, nodeId, addr.getHostName());
|
||||
String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
|
||||
serviceId, nodeId, addr.getHostName());
|
||||
OMNodeDetails peerNodeInfo = new OMNodeDetails.Builder()
|
||||
.setOMServiceId(serviceId)
|
||||
.setOMNodeId(nodeId)
|
||||
.setRpcAddress(addr)
|
||||
.setRatisPort(ratisPort)
|
||||
.setHttpAddress(httpAddr)
|
||||
.setHttpsAddress(httpsAddr)
|
||||
.build();
|
||||
peerNodesList.add(peerNodeInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (found == 1) {
|
||||
LOG.debug("Found one matching OM address with service ID: {} and node" +
|
||||
" ID: {}", localOMServiceId, localOMNodeId);
|
||||
|
||||
setOMNodeDetails(localOMServiceId, localOMNodeId, localRpcAddress,
|
||||
localRatisPort);
|
||||
|
||||
this.peerNodes = peerNodesList;
|
||||
|
||||
LOG.info("Found matching OM address with OMServiceId: {}, " +
|
||||
"OMNodeId: {}, RPC Address: {} and Ratis port: {}",
|
||||
localOMServiceId, localOMNodeId,
|
||||
NetUtils.getHostPortString(localRpcAddress), localRatisPort);
|
||||
return;
|
||||
} else if (found > 1) {
|
||||
String msg = "Configuration has multiple " + OZONE_OM_ADDRESS_KEY +
|
||||
" addresses that match local node's address. Please configure the" +
|
||||
" system with " + OZONE_OM_SERVICE_IDS_KEY + " and " +
|
||||
OZONE_OM_ADDRESS_KEY;
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isOMAddressSet) {
|
||||
// No OM address is set. Fallback to default
|
||||
InetSocketAddress omAddress = OmUtils.getOmAddress(conf);
|
||||
int ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY,
|
||||
OZONE_OM_RATIS_PORT_DEFAULT);
|
||||
|
||||
LOG.info("Configuration either no {} set. Falling back to the default " +
|
||||
"OM address {}", OZONE_OM_ADDRESS_KEY, omAddress);
|
||||
|
||||
setOMNodeDetails(null, null, omAddress, ratisPort);
|
||||
|
||||
} else {
|
||||
String msg = "Configuration has no " + OZONE_OM_ADDRESS_KEY + " " +
|
||||
"address that matches local node's address. Please configure the " +
|
||||
"system with " + OZONE_OM_ADDRESS_KEY;
|
||||
LOG.info(msg);
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds and sets OMNodeDetails object.
|
||||
*/
|
||||
private void setOMNodeDetails(String serviceId, String nodeId,
|
||||
InetSocketAddress rpcAddress, int ratisPort) {
|
||||
|
||||
if (serviceId == null) {
|
||||
// If no serviceId is set, take the default serviceID om-service
|
||||
serviceId = OzoneConsts.OM_SERVICE_ID_DEFAULT;
|
||||
LOG.info("OM Service ID is not set. Setting it to the default ID: {}",
|
||||
serviceId);
|
||||
}
|
||||
if (nodeId == null) {
|
||||
// If no nodeId is set, take the omId from omStorage as the nodeID
|
||||
nodeId = omId;
|
||||
LOG.info("OM Node ID is not set. Setting it to the OmStorage's " +
|
||||
"OmID: {}", nodeId);
|
||||
}
|
||||
|
||||
this.omNodeDetails = new OMNodeDetails.Builder()
|
||||
.setOMServiceId(serviceId)
|
||||
.setOMNodeId(nodeId)
|
||||
.setRpcAddress(rpcAddress)
|
||||
.setRatisPort(ratisPort)
|
||||
.build();
|
||||
|
||||
// Set this nodes OZONE_OM_ADDRESS_KEY to the discovered address.
|
||||
configuration.set(OZONE_OM_ADDRESS_KEY,
|
||||
NetUtils.getHostPortString(rpcAddress));
|
||||
|
||||
// Get and set Http(s) address of local node. If base config keys are
|
||||
// not set, check for keys suffixed with OM serivce ID and node ID.
|
||||
setOMNodeSpecificConfigs(serviceId, nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any of the following configuration keys have been set using OM
|
||||
* Node ID suffixed to the key. If yes, then set the base key with the
|
||||
* configured valued.
|
||||
* 1. {@link OMConfigKeys#OZONE_OM_HTTP_ADDRESS_KEY}
|
||||
* 2. {@link OMConfigKeys#OZONE_OM_HTTPS_ADDRESS_KEY}
|
||||
* 3. {@link OMConfigKeys#OZONE_OM_HTTP_BIND_HOST_KEY}
|
||||
* 4. {@link OMConfigKeys#OZONE_OM_HTTPS_BIND_HOST_KEY}
|
||||
*/
|
||||
private void setOMNodeSpecificConfigs(String omServiceId, String omNodeId) {
|
||||
String[] confKeys = new String[] {
|
||||
OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY};
|
||||
|
||||
for (String confKey : confKeys) {
|
||||
String confValue = OmUtils.getConfSuffixedWithOMNodeId(
|
||||
configuration, confKey, omServiceId, omNodeId);
|
||||
if (confValue != null) {
|
||||
LOG.info("Setting configuration key {} with value of key {}: {}",
|
||||
confKey, OmUtils.addKeySuffixes(confKey, omNodeId), confValue);
|
||||
configuration.set(confKey, confValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private KeyProviderCryptoExtension createKeyProviderExt(
|
||||
OzoneConfiguration conf) throws IOException {
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
@ -1024,7 +850,6 @@ private static boolean isOzoneSecurityEnabled() {
|
||||
*/
|
||||
public static OzoneManager createOm(OzoneConfiguration conf)
|
||||
throws IOException, AuthenticationException {
|
||||
loginOMUserIfSecurityEnabled(conf);
|
||||
return new OzoneManager(conf);
|
||||
}
|
||||
|
||||
@ -1053,6 +878,7 @@ private static void loginOMUserIfSecurityEnabled(OzoneConfiguration conf)
|
||||
@VisibleForTesting
|
||||
public static boolean omInit(OzoneConfiguration conf) throws IOException,
|
||||
AuthenticationException {
|
||||
OMHANodeDetails.loadOMHAConfig(conf);
|
||||
loginOMUserIfSecurityEnabled(conf);
|
||||
OMStorage omStorage = new OMStorage(conf);
|
||||
StorageState state = omStorage.getState();
|
||||
@ -1361,7 +1187,7 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
|
||||
return omRpcServer;
|
||||
}
|
||||
|
||||
InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(configuration);
|
||||
InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);
|
||||
|
||||
final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
|
||||
OZONE_OM_HANDLER_COUNT_DEFAULT);
|
||||
|
@ -0,0 +1,306 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om.ha;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
|
||||
|
||||
/**
|
||||
* Class which maintains peer information and it's own OM node information.
|
||||
*/
|
||||
public class OMHANodeDetails {
|
||||
|
||||
private static String[] genericConfigKeys = new String[] {
|
||||
OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY,
|
||||
OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY,
|
||||
OMConfigKeys.OZONE_OM_DB_DIRS,
|
||||
OMConfigKeys.OZONE_OM_ADDRESS_KEY,
|
||||
};
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(OMHANodeDetails.class);
|
||||
private final OMNodeDetails localNodeDetails;
|
||||
private final List<OMNodeDetails> peerNodeDetails;
|
||||
|
||||
public OMHANodeDetails(OMNodeDetails localNodeDetails,
|
||||
List<OMNodeDetails> peerNodeDetails) {
|
||||
this.localNodeDetails = localNodeDetails;
|
||||
this.peerNodeDetails = peerNodeDetails;
|
||||
}
|
||||
|
||||
public OMNodeDetails getLocalNodeDetails() {
|
||||
return localNodeDetails;
|
||||
}
|
||||
|
||||
public List< OMNodeDetails > getPeerNodeDetails() {
|
||||
return peerNodeDetails;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Inspects and loads OM node configurations.
|
||||
*
|
||||
* If {@link OMConfigKeys#OZONE_OM_SERVICE_IDS_KEY} is configured with
|
||||
* multiple ids and/ or if {@link OMConfigKeys#OZONE_OM_NODE_ID_KEY} is not
|
||||
* specifically configured , this method determines the omServiceId
|
||||
* and omNodeId by matching the node's address with the configured
|
||||
* addresses. When a match is found, it sets the omServicId and omNodeId from
|
||||
* the corresponding configuration key. This method also finds the OM peers
|
||||
* nodes belonging to the same OM service.
|
||||
*
|
||||
* @param conf
|
||||
*/
|
||||
public static OMHANodeDetails loadOMHAConfig(OzoneConfiguration conf) {
|
||||
InetSocketAddress localRpcAddress = null;
|
||||
String localOMServiceId = null;
|
||||
String localOMNodeId = null;
|
||||
int localRatisPort = 0;
|
||||
Collection<String> omServiceIds = conf.getTrimmedStringCollection(
|
||||
OZONE_OM_SERVICE_IDS_KEY);
|
||||
|
||||
String knownOMNodeId = conf.get(OZONE_OM_NODE_ID_KEY);
|
||||
int found = 0;
|
||||
boolean isOMAddressSet = false;
|
||||
|
||||
for (String serviceId : omServiceIds) {
|
||||
Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, serviceId);
|
||||
|
||||
if (omNodeIds.size() == 0) {
|
||||
String msg = "Configuration does not have any value set for " +
|
||||
OZONE_OM_NODES_KEY + " for service ID " + serviceId + ". List of " +
|
||||
"OM Node ID's should be specified for the service ID";
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
|
||||
List<OMNodeDetails> peerNodesList = new ArrayList<>();
|
||||
boolean isPeer;
|
||||
for (String nodeId : omNodeIds) {
|
||||
if (knownOMNodeId != null && !knownOMNodeId.equals(nodeId)) {
|
||||
isPeer = true;
|
||||
} else {
|
||||
isPeer = false;
|
||||
}
|
||||
String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
|
||||
serviceId, nodeId);
|
||||
String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
|
||||
if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
|
||||
String msg = "Configuration does not have any value set for " +
|
||||
rpcAddrKey + "." + "OM Rpc Address should be set for all node " +
|
||||
"IDs for a service ID.";
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
|
||||
// If OM address is set for any node id, we will not fallback to the
|
||||
// default
|
||||
isOMAddressSet = true;
|
||||
|
||||
String ratisPortKey = OmUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
|
||||
serviceId, nodeId);
|
||||
int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
|
||||
|
||||
InetSocketAddress addr = null;
|
||||
try {
|
||||
addr = NetUtils.createSocketAddr(rpcAddrStr);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception in creating socket address " + addr, e);
|
||||
continue;
|
||||
}
|
||||
if (!addr.isUnresolved()) {
|
||||
if (!isPeer && OmUtils.isAddressLocal(addr)) {
|
||||
localRpcAddress = addr;
|
||||
localOMServiceId = serviceId;
|
||||
localOMNodeId = nodeId;
|
||||
localRatisPort = ratisPort;
|
||||
found++;
|
||||
} else {
|
||||
// This OMNode belongs to same OM service as the current OMNode.
|
||||
// Add it to peerNodes list.
|
||||
// This OMNode belongs to same OM service as the current OMNode.
|
||||
// Add it to peerNodes list.
|
||||
peerNodesList.add(getHAOMNodeDetails(conf, serviceId,
|
||||
nodeId, addr, ratisPort));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (found == 1) {
|
||||
LOG.debug("Found one matching OM address with service ID: {} and node" +
|
||||
" ID: {}", localOMServiceId, localOMNodeId);
|
||||
|
||||
LOG.info("Found matching OM address with OMServiceId: {}, " +
|
||||
"OMNodeId: {}, RPC Address: {} and Ratis port: {}",
|
||||
localOMServiceId, localOMNodeId,
|
||||
NetUtils.getHostPortString(localRpcAddress), localRatisPort);
|
||||
|
||||
|
||||
setOMNodeSpecificConfigs(conf, localOMServiceId, localOMNodeId);
|
||||
return new OMHANodeDetails(getHAOMNodeDetails(conf, localOMServiceId,
|
||||
localOMNodeId, localRpcAddress, localRatisPort), peerNodesList);
|
||||
|
||||
} else if (found > 1) {
|
||||
String msg = "Configuration has multiple " + OZONE_OM_ADDRESS_KEY +
|
||||
" addresses that match local node's address. Please configure the" +
|
||||
" system with " + OZONE_OM_SERVICE_IDS_KEY + " and " +
|
||||
OZONE_OM_ADDRESS_KEY;
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isOMAddressSet) {
|
||||
// No OM address is set. Fallback to default
|
||||
InetSocketAddress omAddress = OmUtils.getOmAddress(conf);
|
||||
int ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY,
|
||||
OZONE_OM_RATIS_PORT_DEFAULT);
|
||||
|
||||
LOG.info("Configuration either no {} set. Falling back to the default " +
|
||||
"OM address {}", OZONE_OM_ADDRESS_KEY, omAddress);
|
||||
|
||||
return new OMHANodeDetails(getOMNodeDetails(conf, null,
|
||||
null, omAddress, ratisPort), new ArrayList<>());
|
||||
|
||||
} else {
|
||||
String msg = "Configuration has no " + OZONE_OM_ADDRESS_KEY + " " +
|
||||
"address that matches local node's address. Please configure the " +
|
||||
"system with " + OZONE_OM_ADDRESS_KEY;
|
||||
LOG.info(msg);
|
||||
throw new OzoneIllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Local OM Node Details.
|
||||
* @param serviceId - Service ID this OM belongs to,
|
||||
* @param nodeId - Node ID of this OM.
|
||||
* @param rpcAddress - Rpc Address of the OM.
|
||||
* @param ratisPort - Ratis port of the OM.
|
||||
* @return OMNodeDetails
|
||||
*/
|
||||
public static OMNodeDetails getOMNodeDetails(OzoneConfiguration conf,
|
||||
String serviceId, String nodeId, InetSocketAddress rpcAddress,
|
||||
int ratisPort) {
|
||||
|
||||
if (serviceId == null) {
|
||||
// If no serviceId is set, take the default serviceID om-service
|
||||
serviceId = OzoneConsts.OM_SERVICE_ID_DEFAULT;
|
||||
LOG.info("OM Service ID is not set. Setting it to the default ID: {}",
|
||||
serviceId);
|
||||
}
|
||||
|
||||
|
||||
// We need to pass null for serviceID and nodeID as this is set for
|
||||
// non-HA cluster. This means one node OM cluster.
|
||||
String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
|
||||
null, null, rpcAddress.getHostName());
|
||||
String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
|
||||
null, null, rpcAddress.getHostName());
|
||||
|
||||
return new OMNodeDetails.Builder()
|
||||
.setOMServiceId(serviceId)
|
||||
.setOMNodeId(nodeId)
|
||||
.setRpcAddress(rpcAddress)
|
||||
.setRatisPort(ratisPort)
|
||||
.setHttpAddress(httpAddr)
|
||||
.setHttpsAddress(httpsAddr)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create Local OM Node Details.
|
||||
* @param serviceId - Service ID this OM belongs to,
|
||||
* @param nodeId - Node ID of this OM.
|
||||
* @param rpcAddress - Rpc Address of the OM.
|
||||
* @param ratisPort - Ratis port of the OM.
|
||||
* @return OMNodeDetails
|
||||
*/
|
||||
public static OMNodeDetails getHAOMNodeDetails(OzoneConfiguration conf,
|
||||
String serviceId, String nodeId, InetSocketAddress rpcAddress,
|
||||
int ratisPort) {
|
||||
Preconditions.checkNotNull(serviceId);
|
||||
Preconditions.checkNotNull(nodeId);
|
||||
|
||||
String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
|
||||
serviceId, nodeId, rpcAddress.getHostName());
|
||||
String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
|
||||
serviceId, nodeId, rpcAddress.getHostName());
|
||||
|
||||
return new OMNodeDetails.Builder()
|
||||
.setOMServiceId(serviceId)
|
||||
.setOMNodeId(nodeId)
|
||||
.setRpcAddress(rpcAddress)
|
||||
.setRatisPort(ratisPort)
|
||||
.setHttpAddress(httpAddr)
|
||||
.setHttpsAddress(httpsAddr)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if any of the following configuration keys have been set using OM
|
||||
* Node ID suffixed to the key. If yes, then set the base key with the
|
||||
* configured valued.
|
||||
* 1. {@link OMConfigKeys#OZONE_OM_HTTP_ADDRESS_KEY}
|
||||
* 2. {@link OMConfigKeys#OZONE_OM_HTTPS_ADDRESS_KEY}
|
||||
* 3. {@link OMConfigKeys#OZONE_OM_HTTP_BIND_HOST_KEY}
|
||||
* 4. {@link OMConfigKeys#OZONE_OM_HTTPS_BIND_HOST_KEY}\
|
||||
* 5. {@link OMConfigKeys#OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE}
|
||||
* 6. {@link OMConfigKeys#OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY}
|
||||
* 7. {@link OMConfigKeys#OZONE_OM_KERBEROS_KEYTAB_FILE_KEY}
|
||||
* 8. {@link OMConfigKeys#OZONE_OM_KERBEROS_PRINCIPAL_KEY}
|
||||
* 9. {@link OMConfigKeys#OZONE_OM_DB_DIRS}
|
||||
* 10. {@link OMConfigKeys#OZONE_OM_ADDRESS_KEY}
|
||||
*/
|
||||
private static void setOMNodeSpecificConfigs(
|
||||
OzoneConfiguration ozoneConfiguration, String omServiceId,
|
||||
String omNodeId) {
|
||||
|
||||
for (String confKey : genericConfigKeys) {
|
||||
String confValue = OmUtils.getConfSuffixedWithOMNodeId(
|
||||
ozoneConfiguration, confKey, omServiceId, omNodeId);
|
||||
if (confValue != null) {
|
||||
LOG.info("Setting configuration key {} with value of key {}: {}",
|
||||
confKey, OmUtils.addKeySuffixes(confKey, omNodeId), confValue);
|
||||
ozoneConfiguration.set(confKey, confValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -15,7 +15,7 @@
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
package org.apache.hadoop.ozone.om.ha;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om.ha;
|
||||
|
||||
/**
|
||||
* This package contains classes related to OM HA.
|
||||
*/
|
@ -40,7 +40,7 @@
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
||||
|
@ -23,7 +23,7 @@
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.ozone.om.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
|
||||
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
|
||||
import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
|
||||
import org.apache.http.Header;
|
||||
|
@ -32,7 +32,7 @@
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.ha.OMNodeDetails;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
||||
|
Loading…
Reference in New Issue
Block a user