YARN-11664. Remove HDFS Binaries/Jars Dependency From Yarn (#6631)

To support YARN deployments in clusters without HDFS
some changes have been made in packaging

* new hadoop-common class org.apache.hadoop.fs.HdfsCommonConstants
* hdfs class org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair moved
  from hdfs-client to hadoop-common
* YARN handlers for DSQuotaExceededException replaced by use of superclass
  ClusterStorageCapacityExceededException.  

Contributed by Syed Shameerur Rahman
This commit is contained in:
Syed Shameerur Rahman 2024-09-04 17:56:42 +05:30 committed by GitHub
parent 9486844610
commit 6c01490f14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 100 additions and 18 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
/**
* This class contains constants for configuration keys and default values.
*/
@InterfaceAudience.LimitedPrivate({"YARN", "HDFS"})
@InterfaceStability.Evolving
public final class HdfsCommonConstants {
/**
* Represents the kind of delegation token used for HDFS.
* This is a constant string value "HDFS_DELEGATION_TOKEN".
*/
public static final Text HDFS_DELEGATION_KIND =
new Text("HDFS_DELEGATION_TOKEN");
/**
* DFS_ADMIN configuration: {@value}.
*/
public static final String DFS_ADMIN = "dfs.cluster.administrators";
private HdfsCommonConstants() {
}
}

View File

@ -22,13 +22,13 @@
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
/** /**
* A little struct class to wrap an InputStream and an OutputStream. * This class wraps an InputStream and an OutputStream.
* Both the InputStream and OutputStream are closed on close call.
* This class is moved from HDFS module to COMMON module for removing HDFS dependencies from YARN.
*/ */
@InterfaceAudience.Private
public class IOStreamPair implements Closeable { public class IOStreamPair implements Closeable {
public final InputStream in; public final InputStream in;
public final OutputStream out; public final OutputStream out;

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package provides access to following class.
* {@link org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair} class.
*/
@InterfaceAudience.Private
package org.apache.hadoop.hdfs.protocol.datatransfer;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -26,6 +26,7 @@
import org.apache.commons.collections.map.LRUMap; import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.HdfsCommonConstants;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -41,8 +42,12 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class DelegationTokenIdentifier public class DelegationTokenIdentifier
extends AbstractDelegationTokenIdentifier { extends AbstractDelegationTokenIdentifier {
public static final Text HDFS_DELEGATION_KIND =
new Text("HDFS_DELEGATION_TOKEN"); /**
* The value is referenced from {@link HdfsCommonConstants#HDFS_DELEGATION_KIND}.
*/
@Deprecated
public static final Text HDFS_DELEGATION_KIND = HdfsCommonConstants.HDFS_DELEGATION_KIND;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static Map<TokenIdentifier, UserGroupInformation> ugiCache = private static Map<TokenIdentifier, UserGroupInformation> ugiCache =

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.HdfsCommonConstants;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -381,7 +382,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_XATTRS_ENABLED_KEY = "dfs.namenode.xattrs.enabled"; public static final String DFS_NAMENODE_XATTRS_ENABLED_KEY = "dfs.namenode.xattrs.enabled";
public static final boolean DFS_NAMENODE_XATTRS_ENABLED_DEFAULT = true; public static final boolean DFS_NAMENODE_XATTRS_ENABLED_DEFAULT = true;
public static final String DFS_ADMIN = "dfs.cluster.administrators"; /**
* The value is referenced from {@link HdfsCommonConstants#DFS_ADMIN}.
*/
@Deprecated
public static final String DFS_ADMIN = HdfsCommonConstants.DFS_ADMIN;
public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.https.server.keystore.resource"; public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.https.server.keystore.resource";
public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml"; public static final String DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml";
public static final String DFS_SERVER_HTTPS_KEYPASSWORD_KEY = "ssl.server.keystore.keypassword"; public static final String DFS_SERVER_HTTPS_KEYPASSWORD_KEY = "ssl.server.keystore.keypassword";

View File

@ -23,8 +23,8 @@
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsCommonConstants;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -230,7 +230,7 @@ private static void removeHdfsDelegationToken(UserGroupInformation user) {
while (iter.hasNext()) { while (iter.hasNext()) {
Token<? extends TokenIdentifier> token = iter.next(); Token<? extends TokenIdentifier> token = iter.next();
if (token.getKind().equals( if (token.getKind().equals(
DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) { HdfsCommonConstants.HDFS_DELEGATION_KIND)) {
LOG.info("Remove HDFS delegation token {}.", token); LOG.info("Remove HDFS delegation token {}.", token);
iter.remove(); iter.remove();
} }

View File

@ -31,10 +31,10 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsCommonConstants;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.api.RegistryConstants;
@ -1707,12 +1707,12 @@ private boolean checkPermissions(Path dependencyLibTarGzip) throws
YarnConfiguration.YARN_ADMIN_ACL, YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
AccessControlList dfsAdminAcl = new AccessControlList( AccessControlList dfsAdminAcl = new AccessControlList(
getConfig().get(DFSConfigKeys.DFS_ADMIN, " ")); getConfig().get(HdfsCommonConstants.DFS_ADMIN, " "));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (!yarnAdminAcl.isUserAllowed(ugi) && !dfsAdminAcl.isUserAllowed(ugi)) { if (!yarnAdminAcl.isUserAllowed(ugi) && !dfsAdminAcl.isUserAllowed(ugi)) {
LOG.error("User must be on the {} or {} list to have permission to " + LOG.error("User must be on the {} or {} list to have permission to " +
"upload AM dependency tarball", YarnConfiguration.YARN_ADMIN_ACL, "upload AM dependency tarball", YarnConfiguration.YARN_ADMIN_ACL,
DFSConfigKeys.DFS_ADMIN); HdfsCommonConstants.DFS_ADMIN);
return false; return false;
} }

View File

@ -20,9 +20,9 @@
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsCommonConstants;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI; import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
@ -138,7 +138,7 @@ public void setup() throws Throwable {
basedir.mkdirs(); basedir.mkdirs();
} }
yarnAdminNoneAclProp = YarnConfiguration.YARN_ADMIN_ACL + "=none"; yarnAdminNoneAclProp = YarnConfiguration.YARN_ADMIN_ACL + "=none";
dfsAdminAclProp = DFSConfigKeys.DFS_ADMIN + "=" + dfsAdminAclProp = HdfsCommonConstants.DFS_ADMIN + "=" +
UserGroupInformation.getCurrentUser(); UserGroupInformation.getCurrentUser();
System.setProperty(YarnServiceConstants.PROPERTY_LIB_DIR, basedir System.setProperty(YarnServiceConstants.PROPERTY_LIB_DIR, basedir
.getAbsolutePath()); .getAbsolutePath());

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -60,7 +61,6 @@
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -547,7 +547,7 @@ public void append(LogKey logKey, LogValue logValue) throws IOException {
} }
@Override @Override
public void close() throws DSQuotaExceededException { public void close() throws ClusterStorageCapacityExceededException {
try { try {
if (writer != null) { if (writer != null) {
writer.close(); writer.close();
@ -557,7 +557,7 @@ public void close() throws DSQuotaExceededException {
} finally { } finally {
try { try {
this.fsDataOStream.close(); this.fsDataOStream.close();
} catch (DSQuotaExceededException e) { } catch (ClusterStorageCapacityExceededException e) {
LOG.error("Exception in closing {}", LOG.error("Exception in closing {}",
this.fsDataOStream.getClass(), e); this.fsDataOStream.getClass(), e);
throw e; throw e;

View File

@ -38,12 +38,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@ -99,7 +99,7 @@ public void closeWriter() throws LogAggregationDFSException {
if (this.writer != null) { if (this.writer != null) {
try { try {
this.writer.close(); this.writer.close();
} catch (DSQuotaExceededException e) { } catch (ClusterStorageCapacityExceededException e) {
throw new LogAggregationDFSException(e); throw new LogAggregationDFSException(e);
} finally { } finally {
this.writer = null; this.writer = null;