MAPREDUCE-3257. Added authorization checks for the protocol between ResourceManager and ApplicatoinMaster. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189630 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5fadd4de2a
commit
db8ac0ec3c
@ -1795,6 +1795,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-3175. Add authorization to admin web-pages such as /stacks, /jmx
|
||||
etc. (Jonathan Eagles via acmurthy)
|
||||
|
||||
MAPREDUCE-3257. Added authorization checks for the protocol between
|
||||
ResourceManager and ApplicatoinMaster. (vinodkv via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -27,7 +27,7 @@
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.yarn.proto.MRClientProtocol;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenSelector;
|
||||
|
||||
public class MRClientSecurityInfo extends SecurityInfo {
|
||||
|
||||
@ -51,7 +51,7 @@ public Class<? extends Annotation> annotationType() {
|
||||
@Override
|
||||
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
||||
value() {
|
||||
return ApplicationTokenSelector.class;
|
||||
return ClientTokenSelector.class;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -26,12 +26,12 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||
@ -85,8 +85,8 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
@ -131,8 +131,8 @@ public void start() {
|
||||
System
|
||||
.getenv(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME);
|
||||
byte[] bytes = Base64.decodeBase64(secretKeyStr);
|
||||
ApplicationTokenIdentifier identifier =
|
||||
new ApplicationTokenIdentifier(this.appContext.getApplicationID());
|
||||
ClientTokenIdentifier identifier = new ClientTokenIdentifier(
|
||||
this.appContext.getApplicationID());
|
||||
secretManager.setMasterKey(identifier, bytes);
|
||||
}
|
||||
server =
|
||||
|
@ -134,7 +134,9 @@ public void run() {
|
||||
// Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
|
||||
// later is just a buffer so we are not always increasing the
|
||||
// pool-size
|
||||
launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
|
||||
int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
|
||||
LOG.debug("Setting pool size to " + newPoolSize);
|
||||
launcherPool.setCorePoolSize(newPoolSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,40 +27,30 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
||||
// TODO: Make it avro-ish. TokenIdentifier really isn't serialized
|
||||
// as writable but simply uses readFields method in SaslRpcServer
|
||||
// for deserializatoin.
|
||||
public class ApplicationTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
public static final Text KIND_NAME = new Text("YARN_APPLICATION_TOKEN");
|
||||
|
||||
private Text appId;
|
||||
|
||||
// TODO: Add more information in the tokenID such that it is not
|
||||
// transferrable, more secure etc.
|
||||
|
||||
public ApplicationTokenIdentifier(ApplicationId id) {
|
||||
this.appId = new Text(Integer.toString(id.getId()));
|
||||
}
|
||||
private String applicationAttemptId;
|
||||
|
||||
public ApplicationTokenIdentifier() {
|
||||
this.appId = new Text();
|
||||
}
|
||||
|
||||
public Text getApplicationID() {
|
||||
return appId;
|
||||
public ApplicationTokenIdentifier(ApplicationAttemptId appAttemptId) {
|
||||
this();
|
||||
this.applicationAttemptId = appAttemptId.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
appId.write(out);
|
||||
Text.writeString(out, this.applicationAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
appId.readFields(in);
|
||||
this.applicationAttemptId = Text.readString(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -70,10 +60,12 @@ public Text getKind() {
|
||||
|
||||
@Override
|
||||
public UserGroupInformation getUser() {
|
||||
if (appId == null || "".equals(appId.toString())) {
|
||||
if (this.applicationAttemptId == null
|
||||
|| "".equals(this.applicationAttemptId.toString())) {
|
||||
return null;
|
||||
}
|
||||
return UserGroupInformation.createRemoteUser(appId.toString());
|
||||
return UserGroupInformation.createRemoteUser(this.applicationAttemptId
|
||||
.toString());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -28,17 +28,16 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
|
||||
public class ClientToAMSecretManager extends
|
||||
SecretManager<ApplicationTokenIdentifier> {
|
||||
SecretManager<ClientTokenIdentifier> {
|
||||
|
||||
private static Log LOG = LogFactory.getLog(ClientToAMSecretManager.class);
|
||||
|
||||
// Per application masterkeys for managing client-tokens
|
||||
private Map<Text, SecretKey> masterKeys = new HashMap<Text, SecretKey>();
|
||||
|
||||
public void setMasterKey(ApplicationTokenIdentifier identifier, byte[] key) {
|
||||
public void setMasterKey(ClientTokenIdentifier identifier, byte[] key) {
|
||||
SecretKey sk = SecretManager.createSecretKey(key);
|
||||
Text applicationID = identifier.getApplicationID();
|
||||
this.masterKeys.put(applicationID, sk);
|
||||
@ -51,7 +50,7 @@ public void setMasterKey(ApplicationTokenIdentifier identifier, byte[] key) {
|
||||
}
|
||||
}
|
||||
|
||||
private void addMasterKey(ApplicationTokenIdentifier identifier) {
|
||||
private void addMasterKey(ClientTokenIdentifier identifier) {
|
||||
Text applicationID = identifier.getApplicationID();
|
||||
this.masterKeys.put(applicationID, generateSecret());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -64,7 +63,7 @@ private void addMasterKey(ApplicationTokenIdentifier identifier) {
|
||||
|
||||
// TODO: Handle the masterKey invalidation.
|
||||
public synchronized SecretKey getMasterKey(
|
||||
ApplicationTokenIdentifier identifier) {
|
||||
ClientTokenIdentifier identifier) {
|
||||
Text applicationID = identifier.getApplicationID();
|
||||
if (!this.masterKeys.containsKey(applicationID)) {
|
||||
addMasterKey(identifier);
|
||||
@ -74,7 +73,7 @@ public synchronized SecretKey getMasterKey(
|
||||
|
||||
@Override
|
||||
public synchronized byte[] createPassword(
|
||||
ApplicationTokenIdentifier identifier) {
|
||||
ClientTokenIdentifier identifier) {
|
||||
byte[] password =
|
||||
createPassword(identifier.getBytes(), getMasterKey(identifier));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -85,7 +84,7 @@ public synchronized byte[] createPassword(
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(ApplicationTokenIdentifier identifier)
|
||||
public byte[] retrievePassword(ClientTokenIdentifier identifier)
|
||||
throws SecretManager.InvalidToken {
|
||||
byte[] password =
|
||||
createPassword(identifier.getBytes(), getMasterKey(identifier));
|
||||
@ -97,8 +96,8 @@ public byte[] retrievePassword(ApplicationTokenIdentifier identifier)
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTokenIdentifier createIdentifier() {
|
||||
return new ApplicationTokenIdentifier();
|
||||
public ClientTokenIdentifier createIdentifier() {
|
||||
return new ClientTokenIdentifier();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.security.client;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
public class ClientTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
|
||||
|
||||
private Text appId;
|
||||
|
||||
// TODO: Add more information in the tokenID such that it is not
|
||||
// transferrable, more secure etc.
|
||||
|
||||
public ClientTokenIdentifier(ApplicationId id) {
|
||||
this.appId = new Text(Integer.toString(id.getId()));
|
||||
}
|
||||
|
||||
public ClientTokenIdentifier() {
|
||||
this.appId = new Text();
|
||||
}
|
||||
|
||||
public Text getApplicationID() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
appId.write(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
appId.readFields(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return KIND_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UserGroupInformation getUser() {
|
||||
if (appId == null || "".equals(appId.toString())) {
|
||||
return null;
|
||||
}
|
||||
return UserGroupInformation.createRemoteUser(appId.toString());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static class Renewer extends Token.TrivialRenewer {
|
||||
@Override
|
||||
protected Text getKind() {
|
||||
return KIND_NAME;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.security.client;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
|
||||
public class ClientTokenSelector implements
|
||||
TokenSelector<ClientTokenIdentifier> {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(ClientTokenSelector.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Token<ClientTokenIdentifier> selectToken(Text service,
|
||||
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||
if (service == null) {
|
||||
return null;
|
||||
}
|
||||
LOG.info("Looking for a token with service " + service.toString());
|
||||
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||
LOG.info("Token kind is " + token.getKind().toString()
|
||||
+ " and the token's service name is " + token.getService());
|
||||
if (ClientTokenIdentifier.KIND_NAME.equals(token.getKind())
|
||||
&& service.equals(token.getService())) {
|
||||
return (Token<ClientTokenIdentifier>) token;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -25,12 +26,14 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
@ -39,8 +42,8 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -120,12 +123,43 @@ public void start() {
|
||||
super.start();
|
||||
}
|
||||
|
||||
private void authorizeRequest(ApplicationAttemptId appAttemptID)
|
||||
throws YarnRemoteException {
|
||||
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String appAttemptIDStr = appAttemptID.toString();
|
||||
|
||||
UserGroupInformation remoteUgi;
|
||||
try {
|
||||
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
String msg = "Cannot obtain the user-name for ApplicationAttemptID: "
|
||||
+ appAttemptIDStr + ". Got exception: "
|
||||
+ StringUtils.stringifyException(e);
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
|
||||
if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
|
||||
String msg = "Unauthorized request from ApplicationMaster. "
|
||||
+ "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
|
||||
+ " Found: " + appAttemptIDStr;
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request) throws YarnRemoteException {
|
||||
|
||||
ApplicationAttemptId applicationAttemptId = request
|
||||
.getApplicationAttemptId();
|
||||
authorizeRequest(applicationAttemptId);
|
||||
|
||||
ApplicationId appID = applicationAttemptId.getApplicationId();
|
||||
AMResponse lastResponse = responseMap.get(applicationAttemptId);
|
||||
if (lastResponse == null) {
|
||||
@ -170,6 +204,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
|
||||
ApplicationAttemptId applicationAttemptId = request
|
||||
.getApplicationAttemptId();
|
||||
authorizeRequest(applicationAttemptId);
|
||||
|
||||
AMResponse lastResponse = responseMap.get(applicationAttemptId);
|
||||
if (lastResponse == null) {
|
||||
String message = "Application doesn't exist in cache "
|
||||
@ -199,6 +235,7 @@ public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnRemoteException {
|
||||
|
||||
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
||||
authorizeRequest(appAttemptId);
|
||||
|
||||
this.amLivelinessMonitor.receivedPing(appAttemptId);
|
||||
|
||||
|
@ -34,8 +34,8 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
@ -233,9 +233,9 @@ protected synchronized void submitApplication(
|
||||
String clientTokenStr = null;
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Token<ApplicationTokenIdentifier> clientToken = new
|
||||
Token<ApplicationTokenIdentifier>(
|
||||
new ApplicationTokenIdentifier(applicationId),
|
||||
Token<ClientTokenIdentifier> clientToken = new
|
||||
Token<ClientTokenIdentifier>(
|
||||
new ClientTokenIdentifier(applicationId),
|
||||
this.clientToAMSecretManager);
|
||||
clientTokenStr = clientToken.encodeToUrlString();
|
||||
LOG.debug("Sending client token as " + clientTokenStr);
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
@ -214,7 +215,7 @@ private void setupTokensAndEnv(
|
||||
}
|
||||
|
||||
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
|
||||
application.getAppAttemptId().getApplicationId());
|
||||
application.getAppAttemptId());
|
||||
Token<ApplicationTokenIdentifier> token =
|
||||
new Token<ApplicationTokenIdentifier>(id,
|
||||
this.applicationTokenSecretManager);
|
||||
@ -240,7 +241,7 @@ private void setupTokensAndEnv(
|
||||
container.setContainerTokens(
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
||||
|
||||
ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
|
||||
ClientTokenIdentifier identifier = new ClientTokenIdentifier(
|
||||
application.getAppAttemptId().getApplicationId());
|
||||
SecretKey clientSecretKey =
|
||||
this.clientToAMSecretManager.getMasterKey(identifier);
|
||||
|
@ -0,0 +1,250 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestApplicationMasterLauncher.MockRMWithCustomAMLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestAMAuthorization {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
|
||||
|
||||
private static final class MyContainerManager implements ContainerManager {
|
||||
|
||||
Map<String, String> containerEnv;
|
||||
|
||||
public MyContainerManager() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public StartContainerResponse
|
||||
startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
containerEnv = request.getContainerLaunchContext().getEnvironment();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerStatusResponse getContainerStatus(
|
||||
GetContainerStatusRequest request) throws YarnRemoteException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
|
||||
|
||||
private static final Configuration conf = new Configuration();
|
||||
static {
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
}
|
||||
|
||||
public MockRMWithAMS(ContainerManager containerManager) {
|
||||
super(conf, containerManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Skip the login.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
|
||||
return new ApplicationMasterService(getRMContext(),
|
||||
this.appTokenSecretManager, this.scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthorizedAccess() throws Exception {
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
MockRM rm = new MockRMWithAMS(containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(1024);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.containerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.containerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
waitForLaunchedState(attempt);
|
||||
|
||||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
final String serviceAddr = conf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr = containerManager.containerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
token.decodeFromUrlString(tokenURLEncodedStr);
|
||||
currentUser.addToken(token);
|
||||
|
||||
AMRMProtocol client = currentUser
|
||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, NetUtils
|
||||
.createSocketAddr(serviceAddr), conf);
|
||||
}
|
||||
});
|
||||
|
||||
RegisterApplicationMasterRequest request = Records
|
||||
.newRecord(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(applicationAttemptId);
|
||||
client.registerApplicationMaster(request);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnauthorizedAccess() throws Exception {
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
MockRM rm = new MockRMWithAMS(containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(1024);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.containerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.containerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
waitForLaunchedState(attempt);
|
||||
|
||||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
final String serviceAddr = conf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr = containerManager.containerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
token.decodeFromUrlString(tokenURLEncodedStr);
|
||||
currentUser.addToken(token);
|
||||
|
||||
AMRMProtocol client = currentUser
|
||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, NetUtils
|
||||
.createSocketAddr(serviceAddr), conf);
|
||||
}
|
||||
});
|
||||
|
||||
RegisterApplicationMasterRequest request = Records
|
||||
.newRecord(RegisterApplicationMasterRequest.class);
|
||||
ApplicationAttemptId otherAppAttemptId = BuilderUtils
|
||||
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
|
||||
request.setApplicationAttemptId(otherAppAttemptId);
|
||||
try {
|
||||
client.registerApplicationMaster(request);
|
||||
Assert.fail("Should fail with authorization error");
|
||||
} catch (YarnRemoteException e) {
|
||||
Assert.assertEquals("Unauthorized request from ApplicationMaster. "
|
||||
+ "Expected ApplicationAttemptID: "
|
||||
+ applicationAttemptId.toString() + " Found: "
|
||||
+ otherAppAttemptId.toString(), e.getMessage());
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForLaunchedState(RMAppAttempt attempt)
|
||||
throws InterruptedException {
|
||||
int waitCount = 0;
|
||||
while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
|
||||
&& waitCount++ < 20) {
|
||||
LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
|
||||
+ "Current state is " + attempt.getAppAttemptState());
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertEquals(attempt.getAppAttemptState(),
|
||||
RMAppAttemptState.LAUNCHED);
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
@ -101,11 +102,17 @@ public GetContainerStatusResponse getContainerStatus(
|
||||
|
||||
}
|
||||
|
||||
private static final class MockRMWithCustomAMLauncher extends MockRM {
|
||||
static class MockRMWithCustomAMLauncher extends MockRM {
|
||||
|
||||
private final ContainerManager containerManager;
|
||||
|
||||
public MockRMWithCustomAMLauncher(ContainerManager containerManager) {
|
||||
this(new Configuration(), containerManager);
|
||||
}
|
||||
|
||||
public MockRMWithCustomAMLauncher(Configuration conf,
|
||||
ContainerManager containerManager) {
|
||||
super(conf);
|
||||
this.containerManager = containerManager;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,28 @@
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
[libdefaults]
|
||||
default_realm = APACHE.ORG
|
||||
udp_preference_limit = 1
|
||||
extra_addresses = 127.0.0.1
|
||||
[realms]
|
||||
APACHE.ORG = {
|
||||
admin_server = localhost:88
|
||||
kdc = localhost:88
|
||||
}
|
||||
[domain_realm]
|
||||
localhost = APACHE.ORG
|
@ -196,8 +196,8 @@ public void test() throws IOException, InterruptedException {
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
final InetSocketAddress schedulerAddr =
|
||||
NetUtils.createSocketAddr(schedulerAddressString);
|
||||
ApplicationTokenIdentifier appTokenIdentifier =
|
||||
new ApplicationTokenIdentifier(appID);
|
||||
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
|
||||
appAttempt.getAppAttemptId());
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
new ApplicationTokenSecretManager();
|
||||
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
|
Loading…
Reference in New Issue
Block a user