From 675bca2968700d3627449700b827c3771827cef8 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Tue, 28 Oct 2014 16:53:53 -0700 Subject: [PATCH] HDFS-7280. Use netty 4 in WebImageViewer. Contributed by Haohui Mai. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 2 +- .../offlineImageViewer/FSImageHandler.java | 143 ++++++++++-------- .../OfflineImageViewerPB.java | 13 +- .../offlineImageViewer/WebImageViewer.java | 109 +++++++------ .../TestOfflineImageViewer.java | 7 +- .../TestOfflineImageViewerForAcl.java | 5 +- hadoop-project/pom.xml | 6 + 8 files changed, 155 insertions(+), 132 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b5931fcc70..0eb7a14969 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -318,6 +318,8 @@ Release 2.7.0 - UNRELEASED HDFS-7213. processIncrementalBlockReport performance degradation. (Eric Payne via kihwal) + HDFS-7280. Use netty 4 in WebImageViewer. (wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index ae67a7bad4..84b7e688d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -173,7 +173,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> io.netty - netty + netty-all compile diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java index dea6422920..eb93c87fc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FSImageHandler.java @@ -22,105 +22,118 @@ import java.util.List; import java.util.Map; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import static io.netty.handler.codec.http.HttpResponseStatus.*; + +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import static io.netty.handler.codec.http.HttpVersion.*; +import io.netty.handler.codec.http.QueryStringDecoder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.web.JsonUtil; -import org.apache.hadoop.ipc.RemoteException; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.codec.http.HttpVersion; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; -import javax.management.Query; /** * Implement the read-only WebHDFS API for fsimage. */ -class FSImageHandler extends SimpleChannelUpstreamHandler { +class FSImageHandler extends SimpleChannelInboundHandler { public static final Log LOG = LogFactory.getLog(FSImageHandler.class); private final FSImageLoader image; + private final ChannelGroup activeChannels; - FSImageHandler(FSImageLoader image) throws IOException { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + activeChannels.add(ctx.channel()); + } + + FSImageHandler(FSImageLoader image, ChannelGroup activeChannels) throws IOException { this.image = image; + this.activeChannels = activeChannels; } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ChannelFuture future = e.getFuture(); - try { - future = handleOperation(e); - } finally { - future.addListener(ChannelFutureListener.CLOSE); - } - } - - private ChannelFuture handleOperation(MessageEvent e) - throws IOException { - HttpRequest request = (HttpRequest) e.getMessage(); - HttpResponse response = new DefaultHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json"); - + public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) + throws Exception { if (request.getMethod() != HttpMethod.GET) { - response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED); - return e.getChannel().write(response); + DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, + METHOD_NOT_ALLOWED); + resp.headers().set("Connection", "close"); + ctx.write(resp).addListener(ChannelFutureListener.CLOSE); + return; } QueryStringDecoder decoder = new QueryStringDecoder(request.getUri()); final String op = getOp(decoder); - String content; - String path = null; - try { - path = getPath(decoder); - if ("GETFILESTATUS".equals(op)) { - content = image.getFileStatus(path); - } else if ("LISTSTATUS".equals(op)) { - content = image.listStatus(path); - } else if ("GETACLSTATUS".equals(op)) { - content = image.getAclStatus(path); - } else { - throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\""); - } - } catch (IllegalArgumentException ex) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - content = JsonUtil.toJsonString(ex); - } catch (FileNotFoundException ex) { - response.setStatus(HttpResponseStatus.NOT_FOUND); - content = JsonUtil.toJsonString(ex); - } catch (Exception ex) { - content = JsonUtil.toJsonString(ex); + final String content; + String path = getPath(decoder); + if ("GETFILESTATUS".equals(op)) { + content = image.getFileStatus(path); + } else if ("LISTSTATUS".equals(op)) { + content = image.listStatus(path); + } else if ("GETACLSTATUS".equals(op)) { + content = image.getAclStatus(path); + } else { + throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\""); } - HttpHeaders.setContentLength(response, content.length()); - e.getChannel().write(response); - ChannelFuture future = e.getChannel().write(content); + LOG.info("op=" + op + " target=" + path); - LOG.info(response.getStatus().getCode() + " method=" - + request.getMethod().getName() + " op=" + op + " target=" + path); + DefaultFullHttpResponse resp = new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.OK, + Unpooled.wrappedBuffer(content.getBytes())); + resp.headers().set("Content-Type", "application/json"); + resp.headers().set("Content-Length", resp.content().readableBytes()); + resp.headers().set("Connection", "close"); + ctx.write(resp).addListener(ChannelFutureListener.CLOSE); + } - return future; + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + Exception e = cause instanceof Exception ? (Exception) cause : new + Exception(cause); + final String output = JsonUtil.toJsonString(e); + ByteBuf content = Unpooled.wrappedBuffer(output.getBytes()); + final DefaultFullHttpResponse resp = new DefaultFullHttpResponse( + HTTP_1_1, INTERNAL_SERVER_ERROR, content); + + resp.headers().set("Content-Type", "application/json"); + if (e instanceof IllegalArgumentException) { + resp.setStatus(BAD_REQUEST); + } else if (e instanceof FileNotFoundException) { + resp.setStatus(NOT_FOUND); + } + + resp.headers().set("Content-Length", resp.content().readableBytes()); + resp.headers().set("Connection", "close"); + ctx.write(resp).addListener(ChannelFutureListener.CLOSE); } private static String getOp(QueryStringDecoder decoder) { - Map> parameters = decoder.getParameters(); + Map> parameters = decoder.parameters(); return parameters.containsKey("op") ? parameters.get("op").get(0).toUpperCase() : null; } private static String getPath(QueryStringDecoder decoder) throws FileNotFoundException { - String path = decoder.getPath(); + String path = decoder.path(); if (path.startsWith("/webhdfs/v1/")) { return path.substring(11); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java index 5e776f396f..f02acaed64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java @@ -112,12 +112,12 @@ private static Options buildOptions() { * Command line options * @throws IOException */ - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { int status = run(args); System.exit(status); } - public static int run(String[] args) throws IOException { + public static int run(String[] args) throws Exception { Options options = buildOptions(); if (args.length == 0) { printUsage(); @@ -159,8 +159,13 @@ public static int run(String[] args) throws IOException { "r")); } else if (processor.equals("Web")) { String addr = cmd.getOptionValue("addr", "localhost:5978"); - new WebImageViewer(NetUtils.createSocketAddr(addr)) - .initServerAndWait(inputFile); + WebImageViewer viewer = new WebImageViewer(NetUtils.createSocketAddr + (addr)); + try { + viewer.start(inputFile); + } finally { + viewer.close(); + } } return 0; } catch (EOFException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/WebImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/WebImageViewer.java index f86bd10328..087972f94c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/WebImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/WebImageViewer.java @@ -17,48 +17,51 @@ */ package org.apache.hadoop.hdfs.tools.offlineImageViewer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - +import com.google.common.annotations.VisibleForTesting; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.codec.string.StringEncoder; -import com.google.common.annotations.VisibleForTesting; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; /** * WebImageViewer loads a fsimage and exposes read-only WebHDFS API for its * namespace. */ -public class WebImageViewer { +public class WebImageViewer implements Closeable { public static final Log LOG = LogFactory.getLog(WebImageViewer.class); private Channel channel; private InetSocketAddress address; - private final ChannelFactory factory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool(), 1); - private final ServerBootstrap bootstrap = new ServerBootstrap(factory); - static final ChannelGroup allChannels = - new DefaultChannelGroup("WebImageViewer"); + private final ServerBootstrap bootstrap; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final ChannelGroup allChannels; public WebImageViewer(InetSocketAddress address) { this.address = address; + this.bossGroup = new NioEventLoopGroup(); + this.workerGroup = new NioEventLoopGroup(); + this.allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + this.bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class); } /** @@ -66,13 +69,13 @@ public WebImageViewer(InetSocketAddress address) { * @param fsimage the fsimage to load. * @throws IOException if failed to load the fsimage. */ - public void initServerAndWait(String fsimage) throws IOException { - initServer(fsimage); + public void start(String fsimage) throws IOException { try { - channel.getCloseFuture().await(); + initServer(fsimage); + channel.closeFuture().await(); } catch (InterruptedException e) { LOG.info("Interrupted. Stopping the WebImageViewer."); - shutdown(); + close(); } } @@ -82,37 +85,26 @@ public void initServerAndWait(String fsimage) throws IOException { * @throws IOException if fail to load the fsimage. */ @VisibleForTesting - public void initServer(String fsimage) throws IOException { - FSImageLoader loader = FSImageLoader.load(fsimage); + public void initServer(String fsimage) + throws IOException, InterruptedException { + final FSImageLoader loader = FSImageLoader.load(fsimage); - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("channelTracker", new SimpleChannelUpstreamHandler() { + bootstrap.childHandler(new ChannelInitializer() { @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - allChannels.add(e.getChannel()); + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpRequestDecoder(), + new StringEncoder(), + new HttpResponseEncoder(), + new FSImageHandler(loader, allChannels)); } }); - pipeline.addLast("httpDecoder", new HttpRequestDecoder()); - pipeline.addLast("requestHandler", new FSImageHandler(loader)); - pipeline.addLast("stringEncoder", new StringEncoder()); - pipeline.addLast("httpEncoder", new HttpResponseEncoder()); - bootstrap.setPipeline(pipeline); - channel = bootstrap.bind(address); + + channel = bootstrap.bind(address).sync().channel(); allChannels.add(channel); - address = (InetSocketAddress) channel.getLocalAddress(); - LOG.info("WebImageViewer started. Listening on " + address.toString() - + ". Press Ctrl+C to stop the viewer."); - } - - /** - * Stop WebImageViewer. - */ - @VisibleForTesting - public void shutdown() { - allChannels.close().awaitUninterruptibly(); - factory.releaseExternalResources(); + address = (InetSocketAddress) channel.localAddress(); + LOG.info("WebImageViewer started. Listening on " + address.toString() + ". Press Ctrl+C to stop the viewer."); } /** @@ -123,4 +115,11 @@ public void shutdown() { public int getPort() { return address.getPort(); } + + @Override + public void close() { + allChannels.close().awaitUninterruptibly(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java index c7e09ea9dc..5c0d26c2ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java @@ -237,7 +237,7 @@ public int compare(FileStatus first, FileStatus second) { } @Test - public void testFileDistributionCalculatorWithOptions() throws IOException { + public void testFileDistributionCalculatorWithOptions() throws Exception { int status = OfflineImageViewerPB.run(new String[] {"-i", originalFsimage.getAbsolutePath(), "-o", "-", "-p", "FileDistribution", "-maxSize", "512", "-step", "8"}); @@ -258,8 +258,7 @@ public void testPBImageXmlWriter() throws IOException, SAXException, } @Test - public void testWebImageViewer() throws IOException, InterruptedException, - URISyntaxException { + public void testWebImageViewer() throws Exception { WebImageViewer viewer = new WebImageViewer( NetUtils.createSocketAddr("localhost:0")); try { @@ -319,7 +318,7 @@ public void testWebImageViewer() throws IOException, InterruptedException, connection.getResponseCode()); } finally { // shutdown the viewer - viewer.shutdown(); + viewer.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java index 7b4804ab8e..56963a675d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewerForAcl.java @@ -157,8 +157,7 @@ public static void deleteOriginalFSImage() throws IOException { } @Test - public void testWebImageViewerForAcl() throws IOException, - InterruptedException, URISyntaxException { + public void testWebImageViewerForAcl() throws Exception { WebImageViewer viewer = new WebImageViewer( NetUtils.createSocketAddr("localhost:0")); try { @@ -200,7 +199,7 @@ public void testWebImageViewerForAcl() throws IOException, connection.getResponseCode()); } finally { // shutdown the viewer - viewer.shutdown(); + viewer.close(); } } } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 612781a3cd..cac900fa53 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -536,6 +536,12 @@ 3.6.2.Final + + io.netty + netty-all + 4.0.23.Final + + commons-io commons-io