YARN-7037. Optimize data transfer with zero-copy approach for containerlogs REST API in NMWebServices. Contributed by Tao Yang.

This commit is contained in:
Junping Du 2017-08-29 15:16:23 -07:00
parent cc8893edc0
commit ad45d19998
2 changed files with 57 additions and 5 deletions

View File

@ -19,9 +19,13 @@
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@ -195,6 +199,54 @@ public static void outputContainerLog(String containerId, String nodeId,
os.flush();
}
public static void outputContainerLogThroughZeroCopy(String containerId,
String nodeId, String fileName, long fileLength, long outputSize,
String lastModifiedTime, FileInputStream fis, OutputStream os,
ContainerLogAggregationType logType) throws IOException {
long toSkip = 0;
long totalBytesToRead = fileLength;
if (outputSize < 0) {
long absBytes = Math.abs(outputSize);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
} else {
if (outputSize < fileLength) {
totalBytesToRead = outputSize;
}
}
if (totalBytesToRead > 0) {
// output log summary
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n");
sb.append("LogAggregationType: " + logType + "\n");
sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
sb.append("LogType:" + fileName + "\n");
sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
sb.append("LogLength:" + Long.toString(fileLength) + "\n");
sb.append("LogContents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
// output log content
FileChannel inputChannel = fis.getChannel();
WritableByteChannel outputChannel = Channels.newChannel(os);
long position = toSkip;
while (totalBytesToRead > 0) {
long transferred =
inputChannel.transferTo(position, totalBytesToRead, outputChannel);
totalBytesToRead -= transferred;
position += transferred;
}
os.flush();
}
}
public static boolean outputAggregatedContainerLog(Configuration conf,
ApplicationId appId, String appOwner,
String containerId, String nodeId,

View File

@ -426,11 +426,9 @@ public Response getLogs(
public void write(OutputStream os) throws IOException,
WebApplicationException {
try {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
LogToolUtils.outputContainerLog(containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, fileLength,
bytes, lastModifiedTime, fis, os, buf,
LogToolUtils.outputContainerLogThroughZeroCopy(
containerId.toString(), nmContext.getNodeId().toString(),
outputFileName, fileLength, bytes, lastModifiedTime, fis, os,
ContainerLogAggregationType.LOCAL);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + outputFileName;
@ -451,6 +449,8 @@ public void write(OutputStream os) throws IOException,
Application app = nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
appId, appOwner, containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, bytes,