YARN-1254. Fixed NodeManager to not pollute container's credentials. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc23fd3121
commit
0a887a0910
@ -171,6 +171,9 @@ Release 2.1.2 - UNRELEASED
|
|||||||
YARN-1167. Fixed Distributed Shell to not incorrectly show empty hostname
|
YARN-1167. Fixed Distributed Shell to not incorrectly show empty hostname
|
||||||
on RM UI. (Xuan Gong via vinodkv)
|
on RM UI. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
YARN-1254. Fixed NodeManager to not pollute container's credentials. (Omkar
|
||||||
|
Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -65,7 +65,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.FSDownload;
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
@ -130,9 +129,12 @@ public int runLocalization(final InetSocketAddress nmAddr)
|
|||||||
try {
|
try {
|
||||||
// assume credentials in cwd
|
// assume credentials in cwd
|
||||||
// TODO: Fix
|
// TODO: Fix
|
||||||
credFile = lfs.open(
|
Path tokenPath =
|
||||||
new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId)));
|
new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId));
|
||||||
|
credFile = lfs.open(tokenPath);
|
||||||
creds.readTokenStorageStream(credFile);
|
creds.readTokenStorageStream(credFile);
|
||||||
|
// Explicitly deleting token file.
|
||||||
|
lfs.delete(tokenPath, false);
|
||||||
} finally {
|
} finally {
|
||||||
if (credFile != null) {
|
if (credFile != null) {
|
||||||
credFile.close();
|
credFile.close();
|
||||||
|
@ -1017,6 +1017,7 @@ private void writeCredentials(Path nmPrivateCTokensPath)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
credentials = new Credentials(credentials);
|
||||||
LocalizerTokenIdentifier id = secretManager.createIdentifier();
|
LocalizerTokenIdentifier id = secretManager.createIdentifier();
|
||||||
Token<LocalizerTokenIdentifier> localizerToken =
|
Token<LocalizerTokenIdentifier> localizerToken =
|
||||||
new Token<LocalizerTokenIdentifier>(id, secretManager);
|
new Token<LocalizerTokenIdentifier>(id, secretManager);
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -46,6 +47,8 @@
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
@ -77,6 +80,7 @@
|
|||||||
|
|
||||||
public class TestContainerLocalizer {
|
public class TestContainerLocalizer {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class);
|
||||||
static final Path basedir =
|
static final Path basedir =
|
||||||
new Path("target", TestContainerLocalizer.class.getName());
|
new Path("target", TestContainerLocalizer.class.getName());
|
||||||
|
|
||||||
@ -94,7 +98,10 @@ public class TestContainerLocalizer {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerLocalizerMain() throws Exception {
|
public void testContainerLocalizerMain() throws Exception {
|
||||||
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
ContainerLocalizer localizer =
|
||||||
|
setupContainerLocalizerForTest();
|
||||||
|
|
||||||
// verify created cache
|
// verify created cache
|
||||||
List<Path> privCacheList = new ArrayList<Path>();
|
List<Path> privCacheList = new ArrayList<Path>();
|
||||||
@ -190,11 +197,25 @@ public boolean matches(Object o) {
|
|||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testLocalizerTokenIsGettingRemoved() throws Exception {
|
||||||
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
|
localizer.runLocalization(nmAddr);
|
||||||
|
verify(spylfs, times(1)).delete(tokenPath, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked") // mocked generics
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
public void testContainerLocalizerClosesFilesystems() throws Exception {
|
||||||
// verify filesystems are closed when localizer doesn't fail
|
// verify filesystems are closed when localizer doesn't fail
|
||||||
|
FileContext fs = FileContext.getLocalFSFileContext();
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
ContainerLocalizer localizer = setupContainerLocalizerForTest();
|
||||||
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
|
||||||
any(CompletionService.class), any(UserGroupInformation.class));
|
any(CompletionService.class), any(UserGroupInformation.class));
|
||||||
@ -203,6 +224,7 @@ public void testContainerLocalizerClosesFilesystems() throws Exception {
|
|||||||
localizer.runLocalization(nmAddr);
|
localizer.runLocalization(nmAddr);
|
||||||
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
|
||||||
|
|
||||||
|
spylfs = spy(fs.getDefaultFileSystem());
|
||||||
// verify filesystems are closed when localizer fails
|
// verify filesystems are closed when localizer fails
|
||||||
localizer = setupContainerLocalizerForTest();
|
localizer = setupContainerLocalizerForTest();
|
||||||
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
|
||||||
@ -217,7 +239,6 @@ public void testContainerLocalizerClosesFilesystems() throws Exception {
|
|||||||
@SuppressWarnings("unchecked") // mocked generics
|
@SuppressWarnings("unchecked") // mocked generics
|
||||||
private ContainerLocalizer setupContainerLocalizerForTest()
|
private ContainerLocalizer setupContainerLocalizerForTest()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
|
|
||||||
// don't actually create dirs
|
// don't actually create dirs
|
||||||
doNothing().when(spylfs).mkdir(
|
doNothing().when(spylfs).mkdir(
|
||||||
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
isA(Path.class), isA(FsPermission.class), anyBoolean());
|
||||||
@ -245,10 +266,10 @@ private ContainerLocalizer setupContainerLocalizerForTest()
|
|||||||
containerId)));
|
containerId)));
|
||||||
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
|
||||||
).when(spylfs).open(tokenPath);
|
).when(spylfs).open(tokenPath);
|
||||||
|
|
||||||
nmProxy = mock(LocalizationProtocol.class);
|
nmProxy = mock(LocalizationProtocol.class);
|
||||||
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
doReturn(nmProxy).when(localizer).getProxy(nmAddr);
|
||||||
doNothing().when(localizer).sleep(anyInt());
|
doNothing().when(localizer).sleep(anyInt());
|
||||||
|
|
||||||
|
|
||||||
// return result instantly for deterministic test
|
// return result instantly for deterministic test
|
||||||
ExecutorService syncExec = mock(ExecutorService.class);
|
ExecutorService syncExec = mock(ExecutorService.class);
|
||||||
|
Loading…
Reference in New Issue
Block a user