diff --git a/CHANGES.txt b/CHANGES.txt
index 431e02bdc1..4007ef4a82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,11 @@ Hadoop Change Log
 
 Trunk (unreleased changes)
 
+  NEW FEATURES
+   HADOOP-6581. Add authenticated TokenIdentifiers to UGI so that 
+   they can be used for authorization (Kan Zhang and Jitendra Pandey 
+   via jghoman)
+
   IMPROVEMENTS
     HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name 
     - should use common naming convention (boryas)
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 871859be07..b3f8cc70b2 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -17,7 +17,7 @@
 apacheant.version=1.7.1
 ant-task.version=2.0.10
 
-avro.version=1.3.0
+avro.version=1.3.2
 
 checkstyle.version=4.2
 
diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java
index 68697325de..b058d0f766 100644
--- a/src/java/org/apache/hadoop/ipc/Server.java
+++ b/src/java/org/apache/hadoop/ipc/Server.java
@@ -923,7 +923,13 @@ public abstract class Server {
       if (authMethod == SaslRpcServer.AuthMethod.DIGEST) {
         TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
             secretManager);
-        return tokenId.getUser();
+        UserGroupInformation ugi = tokenId.getUser();
+        if (ugi == null) {
+          throw new AccessControlException(
+              "Can't retrieve username from tokenIdentifier.");
+        }
+        ugi.addTokenIdentifier(tokenId);
+        return ugi;
       } else {
         return UserGroupInformation.createRemoteUser(authorizedId);
       }
@@ -1531,7 +1537,7 @@ public abstract class Server {
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
   /** Starts the service.  Must be called before any calls will be handled. */
-  public synchronized void start() throws IOException {
+  public synchronized void start() {
     responder.start();
     listener.start();
     handlers = new Handler[handlerCount];
diff --git a/src/java/org/apache/hadoop/security/SaslRpcServer.java b/src/java/org/apache/hadoop/security/SaslRpcServer.java
index 3928f84838..62c704e2a0 100644
--- a/src/java/org/apache/hadoop/security/SaslRpcServer.java
+++ b/src/java/org/apache/hadoop/security/SaslRpcServer.java
@@ -68,10 +68,10 @@ public class SaslRpcServer {
     return Base64.decodeBase64(identifier.getBytes());
   }
 
-  public static TokenIdentifier getIdentifier(String id,
-      SecretManager<TokenIdentifier> secretManager) throws InvalidToken {
+  public static <T extends TokenIdentifier> T getIdentifier(String id,
+      SecretManager<T> secretManager) throws InvalidToken {
     byte[] tokenId = decodeIdentifier(id);
-    TokenIdentifier tokenIdentifier = secretManager.createIdentifier();
+    T tokenIdentifier = secretManager.createIdentifier();
     try {
       tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
           tokenId)));
@@ -202,11 +202,12 @@ public class SaslRpcServer {
           ac.setAuthorized(false);
         }
         if (ac.isAuthorized()) {
-          String username = getIdentifier(authzid, secretManager).getUser()
-              .getUserName().toString();
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
+            String username = getIdentifier(authzid, secretManager).getUser()
+            .getUserName().toString();
             LOG.debug("SASL server DIGEST-MD5 callback: setting "
                 + "canonicalized client ID: " + username);
+          }
           ac.setAuthorizedID(authzid);
         }
       }
diff --git a/src/java/org/apache/hadoop/security/UserGroupInformation.java b/src/java/org/apache/hadoop/security/UserGroupInformation.java
index c074c0f5d2..eac37b86d4 100644
--- a/src/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/src/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -610,6 +610,28 @@ public class UserGroupInformation {
     return null;
   }
 
+  /**
+   * Add a TokenIdentifier to this UGI. The TokenIdentifier has typically been
+   * authenticated by the RPC layer as belonging to the user represented by this
+   * UGI.
+   * 
+   * @param tokenId
+   *          tokenIdentifier to be added
+   * @return true on successful add of new tokenIdentifier
+   */
+  public synchronized boolean addTokenIdentifier(TokenIdentifier tokenId) {
+    return subject.getPublicCredentials().add(tokenId);
+  }
+
+  /**
+   * Get the set of TokenIdentifiers belonging to this UGI
+   * 
+   * @return the set of TokenIdentifiers belonging to this UGI
+   */
+  public synchronized Set<TokenIdentifier> getTokenIdentifiers() {
+    return subject.getPublicCredentials(TokenIdentifier.class);
+  }
+  
   /**
    * Add a token to this UGI
    * 
diff --git a/src/java/org/apache/hadoop/security/token/delegation/DelegationKey.java b/src/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
index 2edb911b80..293821fb7c 100644
--- a/src/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
+++ b/src/java/org/apache/hadoop/security/token/delegation/DelegationKey.java
@@ -27,6 +27,7 @@ import javax.crypto.SecretKey;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.avro.reflect.Nullable;
 
 /**
  * Key used for generating and verifying delegation tokens
@@ -35,7 +36,8 @@ import org.apache.hadoop.io.WritableUtils;
 public class DelegationKey implements Writable {
   private int keyId;
   private long expiryDate;
-  private SecretKey key;
+  @Nullable
+  private byte[] keyBytes = null;
 
   public DelegationKey() {
     this(0, 0L, null);
@@ -44,7 +46,9 @@ public class DelegationKey implements Writable {
   public DelegationKey(int keyId, long expiryDate, SecretKey key) {
     this.keyId = keyId;
     this.expiryDate = expiryDate;
-    this.key = key;
+    if (key!=null) {
+      this.keyBytes = key.getEncoded();
+    }
   }
 
   public int getKeyId() {
@@ -56,7 +60,12 @@ public class DelegationKey implements Writable {
   }
 
   public SecretKey getKey() {
-    return key;
+    if (keyBytes == null || keyBytes.length == 0) {
+      return null;
+    } else {
+      SecretKey key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+      return key;
+    }
   }
 
   public void setExpiryDate(long expiryDate) {
@@ -68,9 +77,12 @@ public class DelegationKey implements Writable {
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, keyId);
     WritableUtils.writeVLong(out, expiryDate);
-    byte[] keyBytes = key.getEncoded();
-    WritableUtils.writeVInt(out, keyBytes.length);
-    out.write(keyBytes);
+    if (keyBytes == null) {
+      WritableUtils.writeVInt(out, -1);
+    } else {
+      WritableUtils.writeVInt(out, keyBytes.length);
+      out.write(keyBytes);
+    }
   }
 
   /**
@@ -79,8 +91,11 @@ public class DelegationKey implements Writable {
     keyId = WritableUtils.readVInt(in);
     expiryDate = WritableUtils.readVLong(in);
     int len = WritableUtils.readVInt(in);
-    byte[] keyBytes = new byte[len];
-    in.readFully(keyBytes);
-    key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
+    if (len == -1) {
+      keyBytes = null;
+    } else {
+      keyBytes = new byte[len];
+      in.readFully(keyBytes);
+    }
   }
 }
diff --git a/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java b/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
index 10f210eb47..5e0a30f3dc 100644
--- a/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
+++ b/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ipc;
 
 import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.util.Utf8;
 
 @SuppressWarnings("serial")
 public interface AvroTestProtocol {
@@ -27,7 +26,7 @@ public interface AvroTestProtocol {
     public Problem() {}
   }
   void ping();
-  Utf8 echo(Utf8 value);
+  String echo(String value);
   int add(int v1, int v2);
   int error() throws Problem;
 }
diff --git a/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
index 8aaa4e94fe..17ace82099 100644
--- a/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
+++ b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 
 import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.util.Utf8;
 
 /** Unit tests for AvroRpc. */
 public class TestAvroRpc extends TestCase {
@@ -50,7 +49,7 @@ public class TestAvroRpc extends TestCase {
 
     public void ping() {}
     
-    public Utf8 echo(Utf8 value) { return value; }
+    public String echo(String value) { return value; }
 
     public int add(int v1, int v2) { return v1 + v2; }
 
@@ -74,8 +73,8 @@ public class TestAvroRpc extends TestCase {
       
       proxy.ping();
 
-      Utf8 utf8Result = proxy.echo(new Utf8("hello world"));
-      assertEquals(new Utf8("hello world"), utf8Result);
+      String echo = proxy.echo("hello world");
+      assertEquals("hello world", echo);
 
       int intResult = proxy.add(1, 2);
       assertEquals(3, intResult);
diff --git a/src/test/core/org/apache/hadoop/security/TestUserGroupInformation.java b/src/test/core/org/apache/hadoop/security/TestUserGroupInformation.java
index f8f379c53e..f3f72b25a3 100644
--- a/src/test/core/org/apache/hadoop/security/TestUserGroupInformation.java
+++ b/src/test/core/org/apache/hadoop/security/TestUserGroupInformation.java
@@ -34,7 +34,6 @@ import java.util.List;
 
 import junit.framework.Assert;
 
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -215,6 +214,33 @@ public class TestUserGroupInformation {
     assertTrue(otherSet.contains(t2));
   }
   
+  @Test
+  public void testTokenIdentifiers() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+        "TheDoctor", new String[] { "TheTARDIS" });
+    TokenIdentifier t1 = mock(TokenIdentifier.class);
+    TokenIdentifier t2 = mock(TokenIdentifier.class);
+
+    ugi.addTokenIdentifier(t1);
+    ugi.addTokenIdentifier(t2);
+
+    Collection<TokenIdentifier> z = ugi.getTokenIdentifiers();
+    assertTrue(z.contains(t1));
+    assertTrue(z.contains(t2));
+    assertEquals(2, z.size());
+
+    // ensure that the token identifiers are passed through doAs
+    Collection<TokenIdentifier> otherSet = ugi
+        .doAs(new PrivilegedExceptionAction<Collection<TokenIdentifier>>() {
+          public Collection<TokenIdentifier> run() throws IOException {
+            return UserGroupInformation.getCurrentUser().getTokenIdentifiers();
+          }
+        });
+    assertTrue(otherSet.contains(t1));
+    assertTrue(otherSet.contains(t2));
+    assertEquals(2, otherSet.size());
+  }
+
   @Test
   public void testUGIAuthMethod() throws Exception {
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();