YARN-578. Fixed NM to use SecureIOUtils for reading and aggregating logs. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-29 23:14:59 +00:00
parent 01f27f5c23
commit 5420f287cc
8 changed files with 322 additions and 20 deletions

View File

@ -368,6 +368,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-715. Fixed unit test failures - TestDistributedShell and
TestUnmanagedAMLauncher. (Vinod Kumar Vavilapalli via sseth)
YARN-578. Fixed NM to use SecureIOUtils for reading and aggregating logs.
(Omkar Vinit Joshi via vinodkv)
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.

View File

@ -25,8 +25,8 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
@ -137,12 +138,15 @@ public class AggregatedLogFormat {
private final List<String> rootLogDirs;
private final ContainerId containerId;
private final String user;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId) {
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
// Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs);
@ -177,18 +181,30 @@ public class AggregatedLogFormat {
// Write the log itself
FileInputStream in = null;
try {
in = new FileInputStream(logFile);
in = SecureIOUtils.openForRead(logFile, getUser(), null);
byte[] buf = new byte[65535];
int len = 0;
while ((len = in.read(buf)) != -1) {
out.write(buf, 0, len);
}
} catch (IOException e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + e.getMessage();
LOG.error(message, e);
out.write(message.getBytes());
} finally {
in.close();
if (in != null) {
in.close();
}
}
}
}
}
// Added for testing purpose.
public String getUser() {
return user;
}
}
public static class LogWriter {

View File

@ -18,13 +18,21 @@
package org.apache.hadoop.yarn.logaggregation;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.Arrays;
import java.util.Collections;
import junit.framework.Assert;
@ -32,11 +40,14 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
@ -44,6 +55,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@ -97,7 +109,7 @@ public class TestAggregatedLogFormat {
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId);
testContainerId, ugi.getShortUserName());
logWriter.append(logKey, logValue);
logWriter.closeWriter();
@ -131,9 +143,115 @@ public class TestAggregatedLogFormat {
Assert.assertEquals(expectedLength, s.length());
}
@Test(timeout=10000)
public void testContainerLogsFileAccess() throws IOException {
// This test will run only if NativeIO is enabled as SecureIOUtils
// require it to be enabled.
Assume.assumeTrue(NativeIO.isAvailable());
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
File workDir = new File(testWorkDir, "testContainerLogsFileAccess1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
String data = "Log File content for container : ";
// Creating files for container1. Log aggregator will try to read log files
// with illegal user.
ContainerId testContainerId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
Path appDir =
new Path(srcFileRoot, testContainerId1.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath1 = new Path(appDir, testContainerId1.toString());
String stdout = "stdout";
String stderr = "stderr";
writeSrcFile(srcFilePath1, stdout, data + testContainerId1.toString()
+ stdout);
writeSrcFile(srcFilePath1, stderr, data + testContainerId1.toString()
+ stderr);
UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId1);
String randomUser = "randomUser";
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId1, randomUser));
// It is trying simulate a situation where first log file is owned by
// different user (probably symlink) and second one by the user itself.
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
ugi.getShortUserName());
logWriter.append(logKey, logValue);
logWriter.closeWriter();
BufferedReader in =
new BufferedReader(new FileReader(new File(remoteAppLogFile
.toUri().getRawPath())));
String line;
StringBuffer sb = new StringBuffer("");
while ((line = in.readLine()) != null) {
LOG.info(line);
sb.append(line);
}
line = sb.toString();
String stdoutFile1 =
StringUtils.join(
Path.SEPARATOR,
Arrays.asList(new String[] {
srcFileRoot.toUri().toString(),
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stderr }));
String message1 =
"Owner '" + ugi.getShortUserName() + "' for path " + stdoutFile1
+ " did not match expected owner '" + randomUser + "'";
String stdoutFile2 =
StringUtils.join(
Path.SEPARATOR,
Arrays.asList(new String[] {
srcFileRoot.toUri().toString(),
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stdout }));
String message2 =
"Owner '" + ugi.getShortUserName() + "' for path "
+ stdoutFile2 + " did not match expected owner '"
+ ugi.getShortUserName() + "'";
Assert.assertTrue(line.contains(message1));
Assert.assertFalse(line.contains(message2));
Assert.assertFalse(line.contains(data + testContainerId1.toString()
+ stderr));
Assert.assertTrue(line.contains(data + testContainerId1.toString()
+ stdout));
}
private void writeSrcFile(Path srcFilePath, String fileName, long length)
throws IOException {
OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
int ch = filler;
for (int i = 0; i < length; i++) {
osw.write(ch);
}
osw.close();
}
private void writeSrcFile(Path srcFilePath, String fileName, String data)
throws IOException {
OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
osw.write(data);
osw.close();
}
private OutputStreamWriter getOutputStreamWriter(Path srcFilePath,
String fileName) throws IOException, FileNotFoundException,
UnsupportedEncodingException {
File dir = new File(srcFilePath.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
@ -143,10 +261,6 @@ public class TestAggregatedLogFormat {
File outputFile = new File(new File(srcFilePath.toString()), fileName);
FileOutputStream os = new FileOutputStream(outputFile);
OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
int ch = filler;
for (int i = 0; i < length; i++) {
osw.write(ch);
}
osw.close();
return osw;
}
}

View File

@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[libdefaults]
default_realm = APACHE.ORG
udp_preference_limit = 1
extra_addresses = 127.0.0.1
[realms]
APACHE.ORG = {
admin_server = localhost:88
kdc = localhost:88
}
[domain_realm]
localhost = APACHE.ORG

View File

@ -123,7 +123,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
LogKey logKey = new LogKey(containerId);
LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId);
LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName());
try {
this.writer.append(logKey, logValue);
} catch (IOException e) {

View File

@ -39,8 +39,8 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
@ -228,6 +228,27 @@ public class ContainerLogsPage extends NMView {
return;
} else {
FileInputStream logByteStream = null;
try {
logByteStream =
SecureIOUtils.openForRead(logFile, application.getUser(), null);
} catch (IOException e) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
if (e.getMessage().contains(
"did not match expected owner '" + application.getUser()
+ "'")) {
html.h1("Exception reading log file. Application submitted by '"
+ application.getUser()
+ "' doesn't own requested log file : "
+ logFile.getName());
} else {
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
}
return;
}
try {
long toRead = end - start;
if (toRead < logFile.length()) {
@ -236,11 +257,8 @@ public class ContainerLogsPage extends NMView {
logFile.getName(), "?start=0"), "here").
_(" for full log")._();
}
// TODO: Use secure IO Utils to avoid symlink attacks.
// TODO Fix findBugs close warning along with IOUtils change
logByteStream = new FileInputStream(logFile);
IOUtils.skipFully(logByteStream, start);
InputStreamReader reader = new InputStreamReader(logByteStream);
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
@ -260,8 +278,10 @@ public class ContainerLogsPage extends NMView {
reader.close();
} catch (IOException e) {
html.h1("Exception reading log-file. Log file was likely aggregated. "
+ StringUtils.stringifyException(e));
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
} finally {
if (logByteStream != null) {
try {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@ -126,6 +127,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
@SuppressWarnings("unchecked")
public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@ -169,7 +171,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
// ensure filesystems were closed
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
verify(delSrvc).delete(eq(user), eq((Path) null),
eq(new Path(app1LogDir.getAbsolutePath())));
delSrvc.stop();
String containerIdStr = ConverterUtils.toString(container11);

View File

@ -18,27 +18,48 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Assert;
import org.junit.Test;
import com.google.inject.Injector;
import com.google.inject.Module;
public class TestContainerLogsPage {
@Test(timeout=30000)
@ -69,4 +90,99 @@ public class TestContainerLogsPage {
container1, dirsHandler);
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
}
@Test(timeout = 10000)
public void testContainerLogPageAccess() throws IOException {
// SecureIOUtils require Native IO to be enabled. This test will run
// only if it is enabled.
assumeTrue(NativeIO.isAvailable());
String user = "randomUser" + System.currentTimeMillis();
File absLogDir = null, appDir = null, containerDir = null, syslog = null;
try {
// target log directory
absLogDir =
new File("target", TestContainerLogsPage.class.getSimpleName()
+ "LogDir").getAbsoluteFile();
absLogDir.mkdir();
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOG_DIRS, absLogDir.toURI().toString());
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
// Add an application and the corresponding containers
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(conf);
long clusterTimeStamp = 1234;
ApplicationId appId =
BuilderUtils.newApplicationId(recordFactory, clusterTimeStamp, 1);
Application app = mock(Application.class);
when(app.getAppId()).thenReturn(appId);
// Making sure that application returns a random user. This is required
// for SecureIOUtils' file owner check.
when(app.getUser()).thenReturn(user);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId container1 =
BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 0);
// Testing secure read access for log files
// Creating application and container directory and syslog file.
appDir = new File(absLogDir, appId.toString());
appDir.mkdir();
containerDir = new File(appDir, container1.toString());
containerDir.mkdir();
syslog = new File(containerDir, "syslog");
syslog.createNewFile();
BufferedOutputStream out =
new BufferedOutputStream(new FileOutputStream(syslog));
out.write("Log file Content".getBytes());
out.close();
ApplicationACLsManager aclsManager = mock(ApplicationACLsManager.class);
Context context = mock(Context.class);
ConcurrentMap<ApplicationId, Application> appMap =
new ConcurrentHashMap<ApplicationId, Application>();
appMap.put(appId, app);
when(context.getApplications()).thenReturn(appMap);
when(context.getContainers()).thenReturn(
new ConcurrentHashMap<ContainerId, Container>());
ContainersLogsBlock cLogsBlock =
new ContainersLogsBlock(conf, context, aclsManager, dirsHandler);
Map<String, String> params = new HashMap<String, String>();
params.put(YarnWebParams.CONTAINER_ID, container1.toString());
params.put(YarnWebParams.CONTAINER_LOG_TYPE, "syslog");
Injector injector =
WebAppTests.testPage(ContainerLogsPage.class,
ContainersLogsBlock.class, cLogsBlock, params, (Module[])null);
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write(
"Exception reading log file. Application submitted by '" + user
+ "' doesn't own requested log file : syslog");
} finally {
if (syslog != null) {
syslog.delete();
}
if (containerDir != null) {
containerDir.delete();
}
if (appDir != null) {
appDir.delete();
}
if (absLogDir != null) {
absLogDir.delete();
}
}
}
}