YARN-7144. Log Aggregation controller should not swallow the exceptions when it calls closeWriter and closeReader. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-09-06 14:53:31 -07:00
parent dd814946f6
commit 22de9449f8
2 changed files with 17 additions and 20 deletions

View File

@ -44,11 +44,8 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.WriterOutputStream; import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.util.Pair; import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -61,6 +58,7 @@
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile; import org.apache.hadoop.io.file.tfile.TFile;
@ -71,7 +69,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -81,7 +80,8 @@
@Evolving @Evolving
public class AggregatedLogFormat { public class AggregatedLogFormat {
private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); private final static Logger LOG = LoggerFactory.getLogger(
AggregatedLogFormat.class);
private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
private static final LogKey VERSION_KEY = new LogKey("VERSION"); private static final LogKey VERSION_KEY = new LogKey("VERSION");
@ -247,7 +247,7 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
in = secureOpenFile(logFile); in = secureOpenFile(logFile);
} catch (IOException e) { } catch (IOException e) {
logErrorMessage(logFile, e); logErrorMessage(logFile, e);
IOUtils.closeQuietly(in); IOUtils.cleanupWithLogger(LOG, in);
continue; continue;
} }
@ -285,7 +285,7 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
String message = logErrorMessage(logFile, e); String message = logErrorMessage(logFile, e);
out.write(message.getBytes(Charset.forName("UTF-8"))); out.write(message.getBytes(Charset.forName("UTF-8")));
} finally { } finally {
IOUtils.closeQuietly(in); IOUtils.cleanupWithLogger(LOG, in);
} }
} }
} }
@ -555,7 +555,7 @@ public void close() {
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception closing writer", e); LOG.warn("Exception closing writer", e);
} finally { } finally {
IOUtils.closeQuietly(this.fsDataOStream); IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
} }
} }
} }
@ -603,7 +603,7 @@ public String getApplicationOwner() throws IOException {
} }
return null; return null;
} finally { } finally {
IOUtils.closeQuietly(ownerScanner); IOUtils.cleanupWithLogger(LOG, ownerScanner);
} }
} }
@ -649,7 +649,7 @@ public Map<ApplicationAccessType, String> getApplicationAcls()
} }
return acls; return acls;
} finally { } finally {
IOUtils.closeQuietly(aclScanner); IOUtils.cleanupWithLogger(LOG, aclScanner);
} }
} }
@ -774,8 +774,7 @@ public static void readAcontainerLogs(DataInputStream valueStream,
} }
} }
} finally { } finally {
IOUtils.closeQuietly(ps); IOUtils.cleanupWithLogger(LOG, ps, os);
IOUtils.closeQuietly(os);
} }
} }
@ -1001,9 +1000,7 @@ public static Pair<String, String> readContainerMetaDataAndSkipData(
} }
public void close() { public void close() {
IOUtils.closeQuietly(scanner); IOUtils.cleanupWithLogger(LOG, scanner, reader, fsDataIStream);
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(fsDataIStream);
} }
} }

View File

@ -37,9 +37,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -47,6 +44,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,6 +54,8 @@
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
@ -68,7 +68,7 @@
@Unstable @Unstable
public abstract class LogAggregationFileController { public abstract class LogAggregationFileController {
private static final Log LOG = LogFactory.getLog( private static final Logger LOG = LoggerFactory.getLogger(
LogAggregationFileController.class); LogAggregationFileController.class);
/* /*
@ -193,7 +193,7 @@ protected PrintStream createPrintStream(String localDir, String nodeId,
protected void closePrintStream(OutputStream out) { protected void closePrintStream(OutputStream out) {
if (out != System.out) { if (out != System.out) {
IOUtils.closeQuietly(out); IOUtils.cleanupWithLogger(LOG, out);
} }
} }