diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0a9c8f4df1..d1364d0fd2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1309,6 +1309,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2995. Better handling of expired containers in MapReduce ApplicationMaster. (vinodkv via acmurthy) + MAPREDUCE-2995. Fixed race condition in ContainerLauncher. (vinodkv via + acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 125770de44..982f7d334a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -79,8 +79,8 @@ public class ContainerLauncherImpl extends AbstractService implements private RecordFactory recordFactory; //have a cache/map of UGIs so as to avoid creating too many RPC //client connection objects to the same NodeManager - private Map ugiMap = - new HashMap(); + private ConcurrentMap ugiMap = + new ConcurrentHashMap(); public ContainerLauncherImpl(AppContext context) { super(ContainerLauncherImpl.class.getName()); @@ -142,22 +142,19 @@ protected ContainerManager getCMProxy(ContainerId containerID, UserGroupInformation user = UserGroupInformation.getCurrentUser(); - // TODO: Synchronization problems!! if (UserGroupInformation.isSecurityEnabled()) { - if(!ugiMap.containsKey(containerManagerBindAddr)) { - Token token = - new Token( - containerToken.getIdentifier().array(), - containerToken.getPassword().array(), new Text( - containerToken.getKind()), new Text( - containerToken.getService())); - //the user in createRemoteUser in this context is not important - user = UserGroupInformation.createRemoteUser(containerManagerBindAddr); - user.addToken(token); - ugiMap.put(containerManagerBindAddr, user); - } else { - user = ugiMap.get(containerManagerBindAddr); - } + + Token token = new Token( + containerToken.getIdentifier().array(), containerToken + .getPassword().array(), new Text(containerToken.getKind()), + new Text(containerToken.getService())); + // the user in createRemoteUser in this context is not important + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(containerManagerBindAddr); + ugi.addToken(token); + ugiMap.putIfAbsent(containerManagerBindAddr, ugi); + + user = ugiMap.get(containerManagerBindAddr); } ContainerManager proxy = user.doAs(new PrivilegedAction() {