HDFS-6904. YARN unable to renew delegation token fetched via webhdfs due to incorrect service port.
This commit is contained in:
parent
e31f0a6558
commit
e2be333744
@ -628,6 +628,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
HDFS-7228. Add an SSD policy into the default BlockStoragePolicySuite.
|
HDFS-7228. Add an SSD policy into the default BlockStoragePolicySuite.
|
||||||
(jing9)
|
(jing9)
|
||||||
|
|
||||||
|
HDFS-6904. YARN unable to renew delegation token fetched via webhdfs
|
||||||
|
due to incorrect service port. (jitendra)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||||
|
@ -75,44 +75,7 @@
|
|||||||
import org.apache.hadoop.hdfs.web.ParamFilter;
|
import org.apache.hadoop.hdfs.web.ParamFilter;
|
||||||
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
import org.apache.hadoop.hdfs.web.resources.*;
|
||||||
import org.apache.hadoop.hdfs.web.resources.AclPermissionParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.GroupParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
@ -758,10 +721,15 @@ public Response getRoot(
|
|||||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||||
final ExcludeDatanodesParam excludeDatanodes,
|
final ExcludeDatanodesParam excludeDatanodes,
|
||||||
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
|
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
|
||||||
final FsActionParam fsAction
|
final FsActionParam fsAction,
|
||||||
|
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
|
||||||
|
final TokenKindParam tokenKind,
|
||||||
|
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
|
||||||
|
final TokenServiceParam tokenService
|
||||||
) throws IOException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
|
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
|
||||||
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction);
|
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction,
|
||||||
|
tokenKind, tokenService);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Handle HTTP GET request. */
|
/** Handle HTTP GET request. */
|
||||||
@ -794,11 +762,16 @@ public Response get(
|
|||||||
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
|
||||||
final ExcludeDatanodesParam excludeDatanodes,
|
final ExcludeDatanodesParam excludeDatanodes,
|
||||||
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
|
@QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
|
||||||
final FsActionParam fsAction
|
final FsActionParam fsAction,
|
||||||
|
@QueryParam(TokenKindParam.NAME) @DefaultValue(TokenKindParam.DEFAULT)
|
||||||
|
final TokenKindParam tokenKind,
|
||||||
|
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
|
||||||
|
final TokenServiceParam tokenService
|
||||||
) throws IOException, InterruptedException {
|
) throws IOException, InterruptedException {
|
||||||
|
|
||||||
init(ugi, delegation, username, doAsUser, path, op, offset, length,
|
init(ugi, delegation, username, doAsUser, path, op, offset, length,
|
||||||
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction);
|
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
|
||||||
|
tokenKind, tokenService);
|
||||||
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
||||||
@Override
|
@Override
|
||||||
@ -806,7 +779,8 @@ public Response run() throws IOException, URISyntaxException {
|
|||||||
try {
|
try {
|
||||||
return get(ugi, delegation, username, doAsUser,
|
return get(ugi, delegation, username, doAsUser,
|
||||||
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
|
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
|
||||||
xattrNames, xattrEncoding, excludeDatanodes, fsAction);
|
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
|
||||||
|
tokenService);
|
||||||
} finally {
|
} finally {
|
||||||
reset();
|
reset();
|
||||||
}
|
}
|
||||||
@ -828,7 +802,9 @@ private Response get(
|
|||||||
final List<XAttrNameParam> xattrNames,
|
final List<XAttrNameParam> xattrNames,
|
||||||
final XAttrEncodingParam xattrEncoding,
|
final XAttrEncodingParam xattrEncoding,
|
||||||
final ExcludeDatanodesParam excludeDatanodes,
|
final ExcludeDatanodesParam excludeDatanodes,
|
||||||
final FsActionParam fsAction
|
final FsActionParam fsAction,
|
||||||
|
final TokenKindParam tokenKind,
|
||||||
|
final TokenServiceParam tokenService
|
||||||
) throws IOException, URISyntaxException {
|
) throws IOException, URISyntaxException {
|
||||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||||
final NamenodeProtocols np = getRPCServer(namenode);
|
final NamenodeProtocols np = getRPCServer(namenode);
|
||||||
@ -885,6 +861,15 @@ private Response get(
|
|||||||
}
|
}
|
||||||
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
final Token<? extends TokenIdentifier> token = generateDelegationToken(
|
||||||
namenode, ugi, renewer.getValue());
|
namenode, ugi, renewer.getValue());
|
||||||
|
|
||||||
|
final String setServiceName = tokenService.getValue();
|
||||||
|
final String setKind = tokenKind.getValue();
|
||||||
|
if (setServiceName != null) {
|
||||||
|
token.setService(new Text(setServiceName));
|
||||||
|
}
|
||||||
|
if (setKind != null) {
|
||||||
|
token.setKind(new Text(setKind));
|
||||||
|
}
|
||||||
final String js = JsonUtil.toJsonString(token);
|
final String js = JsonUtil.toJsonString(token);
|
||||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||||
}
|
}
|
||||||
|
@ -1210,7 +1210,7 @@ Content-Length: 0
|
|||||||
* Submit a HTTP GET request.
|
* Submit a HTTP GET request.
|
||||||
|
|
||||||
+---------------------------------
|
+---------------------------------
|
||||||
curl -i "http://<HOST>:<PORT>/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=<USER>"
|
curl -i "http://<HOST>:<PORT>/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=<USER>&service=<SERVICE>&kind=<KIND>"
|
||||||
+---------------------------------
|
+---------------------------------
|
||||||
|
|
||||||
The client receives a response with a {{{Token JSON Schema}<<<Token>>> JSON object}}:
|
The client receives a response with a {{{Token JSON Schema}<<<Token>>> JSON object}}:
|
||||||
@ -1232,7 +1232,10 @@ Transfer-Encoding: chunked
|
|||||||
|
|
||||||
See also:
|
See also:
|
||||||
{{{Renewer}<<<renewer>>>}},
|
{{{Renewer}<<<renewer>>>}},
|
||||||
{{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.getDelegationToken
|
{{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.getDelegationToken,
|
||||||
|
{{{Token Kind}<<<kind>>>}},
|
||||||
|
{{{Token Service}<<<service>>>}}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
** {Get Delegation Tokens}
|
** {Get Delegation Tokens}
|
||||||
@ -2518,6 +2521,46 @@ var tokenProperties =
|
|||||||
{{{Cancel Delegation Token}<<<CANCELDELEGATIONTOKEN>>>}}
|
{{{Cancel Delegation Token}<<<CANCELDELEGATIONTOKEN>>>}}
|
||||||
|
|
||||||
|
|
||||||
|
** {Token Kind}
|
||||||
|
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Name | <<<kind>>> |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Description | The kind of the delegation token requested |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Type | String |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Default Value | \<empty\> (Server sets the default kind for the service) |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Valid Values | A string that represents token kind e.g "HDFS_DELEGATION_TOKEN" or "WEBHDFS delegation" |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Syntax | Any string. |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|
||||||
|
See also:
|
||||||
|
{{{Get Delegation Token}<<<GETDELEGATIONTOKEN>>>}}
|
||||||
|
|
||||||
|
|
||||||
|
** {Token Service}
|
||||||
|
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Name | <<<service>>> |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Description | The name of the service where the token is supposed to be used, e.g. ip:port of the namenode |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Type | String |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Default Value | \<empty\> |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Valid Values | ip:port in string format or logical name of the service |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|| Syntax | Any string. |
|
||||||
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
|
||||||
|
See also:
|
||||||
|
{{{Get Delegation Token}<<<GETDELEGATIONTOKEN>>>}}
|
||||||
|
|
||||||
|
|
||||||
** {Username}
|
** {Username}
|
||||||
|
|
||||||
*----------------+-------------------------------------------------------------------+
|
*----------------+-------------------------------------------------------------------+
|
||||||
|
@ -28,10 +28,15 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.URLConnection;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.httpclient.HttpConnection;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
@ -41,22 +46,21 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
import org.apache.hadoop.hdfs.web.resources.*;
|
||||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
|
||||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
public class TestWebHdfsTokens {
|
public class TestWebHdfsTokens {
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
@ -234,6 +238,62 @@ public void testLazyTokenFetchForSWebhdfs() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetTokenServiceAndKind() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Configuration clusterConf = new HdfsConfiguration(conf);
|
||||||
|
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
|
||||||
|
clusterConf.setBoolean(DFSConfigKeys
|
||||||
|
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||||
|
|
||||||
|
// trick the NN into thinking s[ecurity is enabled w/o it trying
|
||||||
|
// to login from a keytab
|
||||||
|
UserGroupInformation.setConfiguration(clusterConf);
|
||||||
|
cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(0).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
|
||||||
|
final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem
|
||||||
|
(clusterConf, "webhdfs");
|
||||||
|
Whitebox.setInternalState(fs, "canRefreshDelegationToken", true);
|
||||||
|
|
||||||
|
URLConnectionFactory factory = new URLConnectionFactory(new ConnectionConfigurator() {
|
||||||
|
@Override
|
||||||
|
public HttpURLConnection configure(HttpURLConnection conn)
|
||||||
|
throws IOException {
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
}) {
|
||||||
|
@Override
|
||||||
|
public URLConnection openConnection(URL url) throws IOException {
|
||||||
|
return super.openConnection(new URL(url + "&service=foo&kind=bar"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Whitebox.setInternalState(fs, "connectionFactory", factory);
|
||||||
|
Token<?> token1 = fs.getDelegationToken();
|
||||||
|
Assert.assertEquals(new Text("bar"), token1.getKind());
|
||||||
|
|
||||||
|
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||||
|
Token<DelegationTokenIdentifier> token2 =
|
||||||
|
fs.new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
|
||||||
|
op, null, new RenewerParam(null)) {
|
||||||
|
@Override
|
||||||
|
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
|
||||||
|
throws IOException {
|
||||||
|
return JsonUtil.toDelegationToken(json);
|
||||||
|
}
|
||||||
|
}.run();
|
||||||
|
|
||||||
|
Assert.assertEquals(new Text("bar"), token2.getKind());
|
||||||
|
Assert.assertEquals(new Text("foo"), token2.getService());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{
|
private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{
|
||||||
|
Loading…
Reference in New Issue
Block a user