HDFS-7959. WebHdfs logging is missing on Datanode (Kihwal Lee via sjlee)
This commit is contained in:
parent
975786492c
commit
bf74dbf80d
@ -300,6 +300,19 @@ log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
|
|||||||
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
|
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
|
||||||
#log4j.appender.nodemanagerrequestlog.RetainDays=3
|
#log4j.appender.nodemanagerrequestlog.RetainDays=3
|
||||||
|
|
||||||
|
|
||||||
|
# WebHdfs request log on datanodes
|
||||||
|
# Specify -Ddatanode.webhdfs.logger=INFO,HTTPDRFA on datanode startup to
|
||||||
|
# direct the log to a separate file.
|
||||||
|
#datanode.webhdfs.logger=INFO,console
|
||||||
|
#log4j.logger.datanode.webhdfs=${datanode.webhdfs.logger}
|
||||||
|
#log4j.appender.HTTPDRFA=org.apache.log4j.DailyRollingFileAppender
|
||||||
|
#log4j.appender.HTTPDRFA.File=${hadoop.log.dir}/hadoop-datanode-webhdfs.log
|
||||||
|
#log4j.appender.HTTPDRFA.layout=org.apache.log4j.PatternLayout
|
||||||
|
#log4j.appender.HTTPDRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
||||||
|
#log4j.appender.HTTPDRFA.DatePattern=.yyyy-MM-dd
|
||||||
|
|
||||||
|
|
||||||
# Appender for viewing information for errors and warnings
|
# Appender for viewing information for errors and warnings
|
||||||
yarn.ewma.cleanupInterval=300
|
yarn.ewma.cleanupInterval=300
|
||||||
yarn.ewma.messageAgeLimitSeconds=86400
|
yarn.ewma.messageAgeLimitSeconds=86400
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
import static io.netty.handler.codec.http.HttpMethod.PUT;
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||||
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HDFS_URI_SCHEME;
|
||||||
@ -53,6 +54,7 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
@ -82,6 +84,7 @@
|
|||||||
|
|
||||||
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||||
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
static final Log LOG = LogFactory.getLog(WebHdfsHandler.class);
|
||||||
|
static final Log REQLOG = LogFactory.getLog("datanode.webhdfs");
|
||||||
public static final String WEBHDFS_PREFIX = WebHdfsFileSystem.PATH_PREFIX;
|
public static final String WEBHDFS_PREFIX = WebHdfsFileSystem.PATH_PREFIX;
|
||||||
public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length();
|
public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length();
|
||||||
public static final String APPLICATION_OCTET_STREAM =
|
public static final String APPLICATION_OCTET_STREAM =
|
||||||
@ -98,6 +101,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||||||
private String path;
|
private String path;
|
||||||
private ParameterParser params;
|
private ParameterParser params;
|
||||||
private UserGroupInformation ugi;
|
private UserGroupInformation ugi;
|
||||||
|
private DefaultHttpResponse resp = null;
|
||||||
|
|
||||||
public WebHdfsHandler(Configuration conf, Configuration confForCreate)
|
public WebHdfsHandler(Configuration conf, Configuration confForCreate)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -119,12 +123,30 @@ public void channelRead0(final ChannelHandlerContext ctx,
|
|||||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
handle(ctx, req);
|
try {
|
||||||
|
handle(ctx, req);
|
||||||
|
} finally {
|
||||||
|
String host = null;
|
||||||
|
try {
|
||||||
|
host = ((InetSocketAddress)ctx.channel().remoteAddress()).
|
||||||
|
getAddress().getHostAddress();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Error retrieving hostname: ", e);
|
||||||
|
host = "unknown";
|
||||||
|
}
|
||||||
|
REQLOG.info(host + " " + req.method() + " " + req.uri() + " " +
|
||||||
|
getResponseCode());
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int getResponseCode() {
|
||||||
|
return (resp == null) ? INTERNAL_SERVER_ERROR.code() :
|
||||||
|
resp.status().code();
|
||||||
|
}
|
||||||
|
|
||||||
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
String op = params.op();
|
String op = params.op();
|
||||||
@ -152,7 +174,7 @@ public void handle(ChannelHandlerContext ctx, HttpRequest req)
|
|||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
LOG.debug("Error ", cause);
|
LOG.debug("Error ", cause);
|
||||||
DefaultHttpResponse resp = ExceptionHandler.exceptionCaught(cause);
|
resp = ExceptionHandler.exceptionCaught(cause);
|
||||||
resp.headers().set(CONNECTION, CLOSE);
|
resp.headers().set(CONNECTION, CLOSE);
|
||||||
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
}
|
}
|
||||||
@ -184,7 +206,7 @@ private void onCreate(ChannelHandlerContext ctx)
|
|||||||
path, permission, flags, createParent, replication, blockSize, null,
|
path, permission, flags, createParent, replication, blockSize, null,
|
||||||
bufferSize, null), null);
|
bufferSize, null), null);
|
||||||
|
|
||||||
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
|
resp = new DefaultHttpResponse(HTTP_1_1, CREATED);
|
||||||
|
|
||||||
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);
|
final URI uri = new URI(HDFS_URI_SCHEME, nnId, path, null, null);
|
||||||
resp.headers().set(LOCATION, uri.toString());
|
resp.headers().set(LOCATION, uri.toString());
|
||||||
@ -203,7 +225,7 @@ private void onAppend(ChannelHandlerContext ctx) throws IOException {
|
|||||||
DFSClient dfsClient = newDfsClient(nnId, conf);
|
DFSClient dfsClient = newDfsClient(nnId, conf);
|
||||||
OutputStream out = dfsClient.append(path, bufferSize,
|
OutputStream out = dfsClient.append(path, bufferSize,
|
||||||
EnumSet.of(CreateFlag.APPEND), null, null);
|
EnumSet.of(CreateFlag.APPEND), null, null);
|
||||||
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
resp.headers().set(CONTENT_LENGTH, 0);
|
resp.headers().set(CONTENT_LENGTH, 0);
|
||||||
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
|
||||||
new HdfsWriter(dfsClient, out, resp));
|
new HdfsWriter(dfsClient, out, resp));
|
||||||
@ -215,8 +237,8 @@ private void onOpen(ChannelHandlerContext ctx) throws IOException {
|
|||||||
final long offset = params.offset();
|
final long offset = params.offset();
|
||||||
final long length = params.length();
|
final long length = params.length();
|
||||||
|
|
||||||
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
HttpHeaders headers = response.headers();
|
HttpHeaders headers = resp.headers();
|
||||||
// Allow the UI to access the file
|
// Allow the UI to access the file
|
||||||
headers.set(ACCESS_CONTROL_ALLOW_METHODS, GET);
|
headers.set(ACCESS_CONTROL_ALLOW_METHODS, GET);
|
||||||
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||||
@ -240,7 +262,7 @@ private void onOpen(ChannelHandlerContext ctx) throws IOException {
|
|||||||
data = in;
|
data = in;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.write(response);
|
ctx.write(resp);
|
||||||
ctx.writeAndFlush(new ChunkedStream(data) {
|
ctx.writeAndFlush(new ChunkedStream(data) {
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
@ -262,7 +284,7 @@ private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
|
|||||||
IOUtils.cleanup(LOG, dfsclient);
|
IOUtils.cleanup(LOG, dfsclient);
|
||||||
}
|
}
|
||||||
final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
|
final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
|
||||||
DefaultFullHttpResponse resp =
|
resp =
|
||||||
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
|
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
|
||||||
|
|
||||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
||||||
@ -274,8 +296,8 @@ private void onGetFileChecksum(ChannelHandlerContext ctx) throws IOException {
|
|||||||
//Accept preflighted CORS requests
|
//Accept preflighted CORS requests
|
||||||
private void allowCORSOnCreate(ChannelHandlerContext ctx)
|
private void allowCORSOnCreate(ChannelHandlerContext ctx)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
resp = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||||
HttpHeaders headers = response.headers();
|
HttpHeaders headers = resp.headers();
|
||||||
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||||
headers.set(ACCESS_CONTROL_ALLOW_HEADERS, ACCEPT);
|
headers.set(ACCESS_CONTROL_ALLOW_HEADERS, ACCEPT);
|
||||||
headers.set(ACCESS_CONTROL_ALLOW_METHODS, PUT);
|
headers.set(ACCESS_CONTROL_ALLOW_METHODS, PUT);
|
||||||
@ -283,7 +305,7 @@ private void allowCORSOnCreate(ChannelHandlerContext ctx)
|
|||||||
headers.set(CONTENT_LENGTH, 0);
|
headers.set(CONTENT_LENGTH, 0);
|
||||||
headers.set(CONNECTION, KEEP_ALIVE);
|
headers.set(CONNECTION, KEEP_ALIVE);
|
||||||
|
|
||||||
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
|
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeContinueHeader(ChannelHandlerContext ctx) {
|
private static void writeContinueHeader(ChannelHandlerContext ctx) {
|
||||||
|
Loading…
Reference in New Issue
Block a user