HADOOP-6861. Adds new non-static methods in Credentials to read and write token storage file. Contributed by Jitendra Pandey & Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@966911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2010-07-23 00:05:32 +00:00
parent bd802429c1
commit eb10b152fe
6 changed files with 92 additions and 20 deletions

View File

@ -80,6 +80,9 @@ Trunk (unreleased changes)
same principal. Now the principal name is a pattern that has _HOST in it. same principal. Now the principal name is a pattern that has _HOST in it.
(Kan Zhang & Jitendra Pandey via ddas) (Kan Zhang & Jitendra Pandey via ddas)
HADOOP-6861. Adds new non-static methods in Credentials to read and
write token storage file. (Jitendra Pandey & Owen O'Malley via ddas)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -57,7 +57,8 @@ public static void skipCompressedByteArray(DataInput in) throws IOException {
} }
} }
public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { public static int writeCompressedByteArray(DataOutput out,
byte[] bytes) throws IOException {
if (bytes != null) { if (bytes != null) {
ByteArrayOutputStream bos = new ByteArrayOutputStream(); ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzout = new GZIPOutputStream(bos); GZIPOutputStream gzout = new GZIPOutputStream(bos);

View File

@ -19,13 +19,17 @@
package org.apache.hadoop.security; package org.apache.hadoop.security;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -114,24 +118,59 @@ public void addSecretKey(Text alias, byte[] key) {
} }
/** /**
* Convenience method for reading a file, and loading the Tokens * Convenience method for reading a token storage file, and loading the Tokens
* therein in the passed UGI * therein in the passed UGI
* @param filename * @param filename
* @param conf * @param conf
* @param ugi
* @throws IOException * @throws IOException
*/ */
public static void readTokensAndLoadInUGI(String filename, Configuration conf, public void readTokenStorageFile(Path filename,
UserGroupInformation ugi) throws IOException { Configuration conf) throws IOException {
Path localTokensFile = new Path (filename); FSDataInputStream in = filename.getFileSystem(conf).open(filename);
FileSystem localFS = FileSystem.getLocal(conf); try {
FSDataInputStream in = localFS.open(localTokensFile); readTokenStorageStream(in);
Credentials ts = new Credentials(); } catch(IOException ioe) {
ts.readFields(in); throw new IOException("Exception reading " + filename, ioe);
for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) { } finally {
ugi.addToken(token); in.close();
} }
} }
/**
* Convenience method for reading a token storage file directly from a
* datainputstream
*/
public void readTokenStorageStream(DataInputStream in) throws IOException {
byte[] magic = new byte[TOKEN_STORAGE_MAGIC.length];
in.readFully(magic);
if (!Arrays.equals(magic, TOKEN_STORAGE_MAGIC)) {
throw new IOException("Bad header found in token storage.");
}
byte version = in.readByte();
if (version != TOKEN_STORAGE_VERSION) {
throw new IOException("Unknown version " + version +
" in token storage.");
}
readFields(in);
}
private static final byte[] TOKEN_STORAGE_MAGIC = "HDTS".getBytes();
private static final byte TOKEN_STORAGE_VERSION = 0;
public void writeTokenStorageToStream(DataOutputStream os)
throws IOException {
os.write(TOKEN_STORAGE_MAGIC);
os.write(TOKEN_STORAGE_VERSION);
write(os);
}
public void writeTokenStorageFile(Path filename,
Configuration conf) throws IOException {
FSDataOutputStream os = filename.getFileSystem(conf).create(filename);
writeTokenStorageToStream(os);
os.close();
}
/** /**
* Stores all the keys to DataOutput * Stores all the keys to DataOutput
* @param out * @param out
@ -151,7 +190,8 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, secretKeysMap.size()); WritableUtils.writeVInt(out, secretKeysMap.size());
for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) { for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) {
e.getKey().write(out); e.getKey().write(out);
WritableUtils.writeCompressedByteArray(out, e.getValue()); WritableUtils.writeVInt(out, e.getValue().length);
out.write(e.getValue());
} }
} }
@ -178,8 +218,23 @@ public void readFields(DataInput in) throws IOException {
for(int i=0; i<size; i++) { for(int i=0; i<size; i++) {
Text alias = new Text(); Text alias = new Text();
alias.readFields(in); alias.readFields(in);
byte[] key = WritableUtils.readCompressedByteArray(in); int len = WritableUtils.readVInt(in);
secretKeysMap.put(alias, key); byte[] value = new byte[len];
in.readFully(value);
secretKeysMap.put(alias, value);
}
}
/**
* Copy all of the credentials from one credential object into another.
* @param other the credentials to copy
*/
public void addAll(Credentials other) {
for(Map.Entry<Text, byte[]> secret: other.secretKeysMap.entrySet()) {
secretKeysMap.put(secret.getKey(), secret.getValue());
}
for(Map.Entry<Text, Token<?>> token: other.tokenMap.entrySet()){
tokenMap.put(token.getKey(), token.getValue());
} }
} }
} }

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -135,6 +136,9 @@ public boolean logout() throws LoginException {
private static boolean useKerberos; private static boolean useKerberos;
/** Server-side groups fetching service */ /** Server-side groups fetching service */
private static Groups groups; private static Groups groups;
/** The configuration to use */
private static Configuration conf;
public static final long MIN_TIME_BEFORE_RELOGIN = 10 * 60 * 1000L; public static final long MIN_TIME_BEFORE_RELOGIN = 10 * 60 * 1000L;
@ -188,6 +192,7 @@ private static synchronized void initialize(Configuration conf) {
"configuration", ioe); "configuration", ioe);
} }
isInitialized = true; isInitialized = true;
UserGroupInformation.conf = conf;
} }
/** /**
@ -398,9 +403,15 @@ static UserGroupInformation getLoginUser() throws IOException {
login.login(); login.login();
loginUser.setLogin(login); loginUser.setLogin(login);
loginUser = new UserGroupInformation(login.getSubject()); loginUser = new UserGroupInformation(login.getSubject());
String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION); String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
if (tokenFile != null && isSecurityEnabled()) { if (fileLocation != null && isSecurityEnabled()) {
Credentials.readTokensAndLoadInUGI(tokenFile, new Configuration(), loginUser); // load the token storage file and put all of the tokens into the
// user.
Credentials cred = new Credentials();
cred.readTokenStorageFile(new Path("file:///" + fileLocation), conf);
for (Token<?> token: cred.getAllTokens()) {
loginUser.addToken(token);
}
} }
} catch (LoginException le) { } catch (LoginException le) {
throw new IOException("failure to login", le); throw new IOException("failure to login", le);

View File

@ -317,7 +317,9 @@ private void processGeneralOptions(Configuration conf,
throw new FileNotFoundException("File "+fileName+" does not exist."); throw new FileNotFoundException("File "+fileName+" does not exist.");
} }
LOG.debug("setting conf tokensFile: " + fileName); LOG.debug("setting conf tokensFile: " + fileName);
conf.set("tokenCacheFile", localFs.makeQualified(p).toString()); conf.set("mapreduce.job.credentials.json", localFs.makeQualified(p)
.toString());
} }
} }

View File

@ -126,7 +126,7 @@ public void testTokenCacheOption() throws IOException {
Path tmpPath = new Path(tmpFile.toString()); Path tmpPath = new Path(tmpFile.toString());
localFs.create(tmpPath); localFs.create(tmpPath);
new GenericOptionsParser(conf, args); new GenericOptionsParser(conf, args);
String fileName = conf.get("tokenCacheFile"); String fileName = conf.get("mapreduce.job.credentials.json");
assertNotNull("files is null", fileName); assertNotNull("files is null", fileName);
assertEquals("files option does not match", assertEquals("files option does not match",
localFs.makeQualified(tmpPath).toString(), fileName); localFs.makeQualified(tmpPath).toString(), fileName);