diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java new file mode 100644 index 0000000000..aaa1e44305 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY; + +import java.io.IOException; + +import com.sun.jersey.api.container.ContainerFactory; +import com.sun.jersey.api.core.ApplicationAdapter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.ObjectStoreApplication; +import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; +import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; +import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler; + +/** + * Implements object store handling within the DataNode process. This class is + * responsible for initializing and maintaining the RPC clients and servers and + * the web application required for the object store implementation. + */ +public final class ObjectStoreHandler { + + private final ObjectStoreJerseyContainer objectStoreJerseyContainer; + + /** + * Creates a new ObjectStoreHandler. + * + * @param conf configuration + * @throws IOException if there is an I/O error + */ + public ObjectStoreHandler(Configuration conf) throws IOException { + String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY, + DFS_STORAGE_HANDLER_TYPE_DEFAULT); + final StorageHandler storageHandler; + if ("distributed".equalsIgnoreCase(shType)) { + storageHandler = new DistributedStorageHandler(); + } else { + if ("local".equalsIgnoreCase(shType)) { + storageHandler = new LocalStorageHandler(); + } else { + throw new IllegalArgumentException( + String.format("Unrecognized value for %s: %s", + DFS_STORAGE_HANDLER_TYPE_KEY, shType)); + } + } + this.objectStoreJerseyContainer = ContainerFactory.createContainer( + ObjectStoreJerseyContainer.class, new ApplicationAdapter( + new ObjectStoreApplication())); + this.objectStoreJerseyContainer.setStorageHandler(storageHandler); + } + + /** + * Returns the initialized web application container. + * + * @return initialized web application container + */ + public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() { + return this.objectStoreJerseyContainer; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cf9fad6218..ee940b60f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,6 +28,12 @@ public final class OzoneConfigKeys { public static final String DFS_STORAGE_LOCAL_ROOT = "dfs.ozone.localstorage.root"; public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone"; + public static final String DFS_OBJECTSTORE_ENABLED_KEY = + "dfs.objectstore.enabled"; + public static final boolean DFS_OBJECTSTORE_ENABLED_DEFAULT = false; + public static final String DFS_STORAGE_HANDLER_TYPE_KEY = + "dfs.storage.handler.type"; + public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed"; /** * There is no need to instantiate this class. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java new file mode 100644 index 0000000000..d0973478cd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web; + +import org.apache.hadoop.ozone.web.handlers.BucketHandler; +import org.apache.hadoop.ozone.web.handlers.VolumeHandler; + +import javax.ws.rs.core.Application; +import java.util.HashSet; +import java.util.Set; + +/** + * Ozone Application. + */ +public class ObjectStoreApplication extends Application { + public ObjectStoreApplication() { + super(); + } + + @Override + public Set> getClasses() { + HashSet> set = new HashSet<>(); + set.add(BucketHandler.class); + set.add(VolumeHandler.class); + return set; + } + + @Override + public Set getSingletons() { + HashSet set = new HashSet<>(); + return set; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/Header.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/Header.java index a804235b21..3569a3fcc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/Header.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/Header.java @@ -35,6 +35,7 @@ public final class Header { public static final String OZONE_USER = "x-ozone-user"; public static final String OZONE_SIMPLE_AUTHENTICATION_SCHEME = "OZONE"; public static final String OZONE_VERSION_HEADER = "x-ozone-version"; + public static final String OZONE_V1_VERSION_HEADER ="v1"; public static final String OZONE_LIST_QUERY_SERVICE = "service"; public static final String OZONE_LIST_QUERY_VOLUME = "volume"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/package-info.java new file mode 100644 index 0000000000..abef63c933 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/headers/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Ozone HTTP header definitions. + */ +@InterfaceAudience.Private +package org.apache.hadoop.ozone.web.headers; + +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java new file mode 100644 index 0000000000..5cfaa75dad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/CloseableCleanupListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.hadoop.io.IOUtils; + +import java.io.Closeable; + +/** + * A {@link ChannelFutureListener} that closes {@link Closeable} resources. + */ +final class CloseableCleanupListener implements ChannelFutureListener { + + private final Closeable[] closeables; + + /** + * Creates a new CloseableCleanupListener. + * + * @param closeables any number of closeable resources + */ + public CloseableCleanupListener(Closeable... closeables) { + this.closeables = closeables; + } + + @Override + public void operationComplete(ChannelFuture future) { + IOUtils.cleanup(null, closeables); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java new file mode 100644 index 0000000000..b2d4567116 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreChannelHandler.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Abstract base class for the multiple Netty channel handlers used in the + * Object Store Netty channel pipeline. + */ +abstract class ObjectStoreChannelHandler + extends SimpleChannelInboundHandler { + + /** Log usable in all subclasses. */ + protected static final Logger LOG = + LoggerFactory.getLogger(ObjectStoreChannelHandler.class); + + /** + * Handles uncaught exceptions in the channel pipeline by sending an internal + * server error response if the channel is still active. + * + * @param ctx ChannelHandlerContext to receive response + * @param cause Throwable that was unhandled in the channel pipeline + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.error("Unexpected exception in Netty pipeline.", cause); + if (ctx.channel().isActive()) { + sendErrorResponse(ctx, INTERNAL_SERVER_ERROR); + } + } + + /** + * Sends an error response. This method is used when an unexpected error is + * encountered within the channel pipeline, outside of the actual Object Store + * application. It always closes the connection, because we can't in general + * know the state of the connection when these errors occur, so attempting to + * keep the connection alive could be unpredictable. + * + * @param ctx ChannelHandlerContext to receive response + * @param status HTTP response status + */ + protected static void sendErrorResponse(ChannelHandlerContext ctx, + HttpResponseStatus status) { + HttpResponse nettyResp = new DefaultFullHttpResponse(HTTP_1_1, status); + nettyResp.headers().set(CONTENT_LENGTH, 0); + nettyResp.headers().set(CONNECTION, CLOSE); + ctx.writeAndFlush(nettyResp).addListener(ChannelFutureListener.CLOSE); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java new file mode 100644 index 0000000000..5bb20666b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.HOST; +import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING; +import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; +import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import com.sun.jersey.core.header.InBoundHeaders; +import com.sun.jersey.spi.container.ContainerRequest; +import com.sun.jersey.spi.container.ContainerResponse; +import com.sun.jersey.spi.container.ContainerResponseWriter; +import com.sun.jersey.spi.container.WebApplication; + +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.handlers.StorageHandlerBuilder; + +/** + * This is a custom Jersey container that hosts the Object Store web + * application. It supports dispatching an inbound Netty {@link HttpRequest} + * to the Object Store Jersey application. Request dispatching must run + * asynchronously, because the Jersey application must consume the inbound + * HTTP request from a piped stream and produce the outbound HTTP response + * for another piped stream.The Netty channel handlers consume the connected + * ends of these piped streams. Request dispatching cannot run directly on + * the Netty threads, or there would be a risk of deadlock (one thread + * producing/consuming its end of the pipe while no other thread is + * producing/consuming the opposite end). + */ +public final class ObjectStoreJerseyContainer { + + private static final Logger LOG = + LoggerFactory.getLogger(ObjectStoreJerseyContainer.class); + + private final WebApplication webapp; + + private StorageHandler storageHandler; + + /** + * Creates a new ObjectStoreJerseyContainer. + * + * @param webapp web application + */ + public ObjectStoreJerseyContainer(WebApplication webapp) { + this.webapp = webapp; + } + + /** + * Sets the {@link StorageHandler}. This must be called before dispatching any + * requests. + * + * @param newStorageHandler {@link StorageHandler} implementation + */ + public void setStorageHandler(StorageHandler newStorageHandler) { + this.storageHandler = newStorageHandler; + } + + /** + * Asynchronously executes an HTTP request. + * + * @param nettyReq HTTP request + * @param reqIn input stream for reading request body + * @param respOut output stream for writing response body + */ + public Future dispatch(HttpRequest nettyReq, InputStream reqIn, + OutputStream respOut) { + // The request executes on a separate background thread. As soon as enough + // processing has completed to bootstrap the outbound response, the thread + // counts down on a latch. This latch also unblocks callers trying to get + // the asynchronous response out of the returned future. + final CountDownLatch latch = new CountDownLatch(1); + final RequestRunner runner = new RequestRunner(nettyReq, reqIn, respOut, + latch); + final Thread thread = new Thread(runner); + thread.setDaemon(true); + thread.start(); + return new Future() { + + private volatile boolean isCancelled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (latch.getCount() == 0) { + return false; + } + if (!mayInterruptIfRunning) { + return false; + } + if (!thread.isAlive()) { + return false; + } + thread.interrupt(); + try { + thread.join(); + } catch (InterruptedException e) { + LOG.info("Interrupted while attempting to cancel dispatch thread."); + Thread.currentThread().interrupt(); + return false; + } + isCancelled = true; + return true; + } + + @Override + public HttpResponse get() + throws InterruptedException, ExecutionException { + checkCancelled(); + latch.await(); + return this.getOrThrow(); + } + + @Override + public HttpResponse get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + checkCancelled(); + if (!latch.await(timeout, unit)) { + throw new TimeoutException(String.format( + "Timed out waiting for HttpResponse after %d %s.", + timeout, unit.toString().toLowerCase())); + } + return this.getOrThrow(); + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public boolean isDone() { + return !isCancelled && latch.getCount() == 0; + } + + private void checkCancelled() { + if (isCancelled()) { + throw new CancellationException(); + } + } + + private HttpResponse getOrThrow() throws ExecutionException { + try { + return runner.getResponse(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + }; + } + + /** + * Runs the actual handling of the HTTP request. + */ + private final class RequestRunner implements Runnable, + ContainerResponseWriter { + + private final CountDownLatch latch; + private final HttpRequest nettyReq; + private final InputStream reqIn; + private final OutputStream respOut; + + private Exception exception; + private HttpResponse nettyResp; + + /** + * Creates a new RequestRunner. + * + * @param nettyReq HTTP request + * @param reqIn input stream for reading request body + * @param respOut output stream for writing response body + * @param latch for coordinating asynchronous return of HTTP response + */ + public RequestRunner(HttpRequest nettyReq, InputStream reqIn, + OutputStream respOut, CountDownLatch latch) { + this.latch = latch; + this.nettyReq = nettyReq; + this.reqIn = reqIn; + this.respOut = respOut; + } + + @Override + public void run() { + LOG.trace("begin RequestRunner, nettyReq = {}", this.nettyReq); + StorageHandlerBuilder.setStorageHandler( + ObjectStoreJerseyContainer.this.storageHandler); + try { + ContainerRequest jerseyReq = nettyRequestToJerseyRequest( + ObjectStoreJerseyContainer.this.webapp, this.nettyReq, this.reqIn); + ObjectStoreJerseyContainer.this.webapp.handleRequest(jerseyReq, this); + } catch (Exception e) { + this.exception = e; + this.latch.countDown(); + } finally { + IOUtils.cleanup(null, this.reqIn, this.respOut); + StorageHandlerBuilder.removeStorageHandler(); + } + LOG.trace("end RequestRunner, nettyReq = {}", this.nettyReq); + } + + /** + * This is a callback triggered by Jersey as soon as dispatch has completed + * to the point of knowing what kind of response to return. We save the + * response and trigger the latch to unblock callers waiting on the + * asynchronous return of the response. Our response always sets a + * Content-Length header. (We do not support Transfer-Encoding: chunked.) + * We also return the output stream for Jersey to use for writing the + * response body. + * + * @param contentLength length of response + * @param jerseyResp HTTP response returned by Jersey + * @return OutputStream for Jersey to use for writing the response body + */ + @Override + public OutputStream writeStatusAndHeaders(long contentLength, + ContainerResponse jerseyResp) { + LOG.trace( + "begin writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.", + contentLength, jerseyResp); + this.nettyResp = jerseyResponseToNettyResponse(jerseyResp); + this.nettyResp.headers().set(CONTENT_LENGTH, Math.max(0, contentLength)); + this.nettyResp.headers().set(CONNECTION, + HttpHeaderUtil.isKeepAlive(this.nettyReq) ? KEEP_ALIVE : CLOSE); + this.latch.countDown(); + LOG.trace( + "end writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.", + contentLength, jerseyResp); + return this.respOut; + } + + /** + * This is a callback triggered by Jersey after it has completed writing the + * response body to the stream. We must close the stream here to unblock + * the Netty thread consuming the last chunk of the response from the input + * end of the piped stream. + * + * @throws IOException if there is an I/O error + */ + @Override + public void finish() throws IOException { + IOUtils.cleanup(null, this.respOut); + } + + /** + * Gets the HTTP response calculated by the Jersey application, or throws an + * exception if an error occurred during processing. It only makes sense to + * call this method after waiting on the latch to trigger. + * + * @return HTTP response + * @throws Exception if there was an error executing the request + */ + public HttpResponse getResponse() throws Exception { + if (this.exception != null) { + throw this.exception; + } + return this.nettyResp; + } + } + + /** + * Converts a Jersey HTTP response object to a Netty HTTP response object. + * + * @param jerseyResp Jersey HTTP response + * @return Netty HTTP response + */ + private static HttpResponse jerseyResponseToNettyResponse( + ContainerResponse jerseyResp) { + HttpResponse nettyResp = new DefaultHttpResponse(HTTP_1_1, + HttpResponseStatus.valueOf(jerseyResp.getStatus())); + for (Map.Entry> header : + jerseyResp.getHttpHeaders().entrySet()) { + if (!header.getKey().equalsIgnoreCase(CONTENT_LENGTH.toString()) && + !header.getKey().equalsIgnoreCase(TRANSFER_ENCODING.toString())) { + nettyResp.headers().set(header.getKey(), header.getValue()); + } + } + return nettyResp; + } + + /** + * Converts a Netty HTTP request object to a Jersey HTTP request object. + * + * @param webapp web application + * @param nettyReq Netty HTTP request + * @param reqIn input stream for reading request body + * @return Jersey HTTP request + * @throws URISyntaxException if there is an error handling the request URI + */ + private static ContainerRequest nettyRequestToJerseyRequest( + WebApplication webapp, HttpRequest nettyReq, InputStream reqIn) + throws URISyntaxException { + HttpHeaders nettyHeaders = nettyReq.headers(); + InBoundHeaders jerseyHeaders = new InBoundHeaders(); + for (String name : nettyHeaders.names()) { + jerseyHeaders.put(name, nettyHeaders.getAll(name)); + } + String host = nettyHeaders.get(HOST); + String scheme = host.startsWith("https") ? "https://" : "http://"; + String baseUri = scheme + host + "/"; + String reqUri = scheme + host + nettyReq.uri(); + LOG.trace("baseUri = {}, reqUri = {}", baseUri, reqUri); + return new ContainerRequest(webapp, nettyReq.method().name(), + new URI(baseUri), new URI(reqUri), jerseyHeaders, reqIn); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java new file mode 100644 index 0000000000..e9439694e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainerProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import com.sun.jersey.api.container.ContainerException; +import com.sun.jersey.api.core.ResourceConfig; +import com.sun.jersey.spi.container.ContainerProvider; +import com.sun.jersey.spi.container.WebApplication; + +/** + * This is a Jersey {@link ContainerProvider} capable of boostrapping the + * Object Store web application into a custom container. It must be registered + * using the Java service loader mechanism by listing it in + * META-INF/services/com.sun.jersey.spi.container.ContainerProvider . + */ +public final class ObjectStoreJerseyContainerProvider + implements ContainerProvider { + + @Override + public ObjectStoreJerseyContainer createContainer( + Class type, ResourceConfig conf, + WebApplication webapp) throws ContainerException { + return new ObjectStoreJerseyContainer(webapp); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java new file mode 100644 index 0000000000..df7d50b20b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.stream.ChunkedStream; +import org.apache.hadoop.io.IOUtils; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.Future; + +/** + * Object Store Netty channel pipeline handler that handles inbound + * {@link HttpContent} fragments for the request body by sending the bytes into + * the pipe so that the application dispatch thread can read it. + * After receiving the {@link LastHttpContent}, this handler also flushes the + * response. + */ +public final class RequestContentObjectStoreChannelHandler + extends ObjectStoreChannelHandler { + + private final HttpRequest nettyReq; + private final Future nettyResp; + private final OutputStream reqOut; + private final InputStream respIn; + + /** + * Creates a new RequestContentObjectStoreChannelHandler. + * + * @param nettyReq HTTP request + * @param nettyResp asynchronous HTTP response + * @param reqOut output stream for writing request body + * @param respIn input stream for reading response body + */ + public RequestContentObjectStoreChannelHandler(HttpRequest nettyReq, + Future nettyResp, OutputStream reqOut, InputStream respIn) { + this.nettyReq = nettyReq; + this.nettyResp = nettyResp; + this.reqOut = reqOut; + this.respIn = respIn; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpContent content) + throws Exception { + LOG.trace( + "begin RequestContentObjectStoreChannelHandler channelRead0, " + + "ctx = {}, content = {}", ctx, content); + content.content().readBytes(this.reqOut, content.content().readableBytes()); + if (content instanceof LastHttpContent) { + IOUtils.cleanup(null, this.reqOut); + ctx.write(this.nettyResp.get()); + ChannelFuture respFuture = ctx.writeAndFlush(new ChunkedStream( + this.respIn)); + respFuture.addListener(new CloseableCleanupListener(this.respIn)); + if (!HttpHeaderUtil.isKeepAlive(this.nettyReq)) { + respFuture.addListener(ChannelFutureListener.CLOSE); + } + } + LOG.trace( + "end RequestContentObjectStoreChannelHandler channelRead0, " + + "ctx = {}, content = {}", ctx, content); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + super.exceptionCaught(ctx, cause); + IOUtils.cleanup(null, this.reqOut, this.respIn); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java new file mode 100644 index 0000000000..df7edf71ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderUtil; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import org.apache.hadoop.io.IOUtils; + +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.Future; + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Object Store Netty channel pipeline handler that handles an inbound + * {@link HttpRequest} by dispatching it to the Object Store Jersey container. + * The handler establishes 2 sets of connected piped streams: one for inbound + * request handling and another for outbound response handling. The relevant + * ends of these pipes are handed off to the Jersey application dispatch and the + * next channel handler, which is responsible for streaming in the inbound + * request body and flushing out the response body. + */ +public final class RequestDispatchObjectStoreChannelHandler + extends ObjectStoreChannelHandler { + + private final ObjectStoreJerseyContainer jerseyContainer; + + private PipedInputStream reqIn; + private PipedOutputStream reqOut; + private PipedInputStream respIn; + private PipedOutputStream respOut; + + /** + * Creates a new RequestDispatchObjectStoreChannelHandler. + * + * @param jerseyContainer Object Store application Jersey container for + * request dispatch + */ + public RequestDispatchObjectStoreChannelHandler( + ObjectStoreJerseyContainer jerseyContainer) { + this.jerseyContainer = jerseyContainer; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpRequest nettyReq) + throws Exception { + LOG.trace("begin RequestDispatchObjectStoreChannelHandler channelRead0, " + + "ctx = {}, nettyReq = {}", ctx, nettyReq); + if (!nettyReq.decoderResult().isSuccess()) { + sendErrorResponse(ctx, BAD_REQUEST); + return; + } + + this.reqIn = new PipedInputStream(); + this.reqOut = new PipedOutputStream(reqIn); + this.respIn = new PipedInputStream(); + this.respOut = new PipedOutputStream(respIn); + + if (HttpHeaderUtil.is100ContinueExpected(nettyReq)) { + LOG.trace("Sending continue response."); + ctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); + } + + Future nettyResp = this.jerseyContainer.dispatch(nettyReq, + reqIn, respOut); + + ctx.pipeline().replace(this, + RequestContentObjectStoreChannelHandler.class.getSimpleName(), + new RequestContentObjectStoreChannelHandler(nettyReq, nettyResp, + reqOut, respIn)); + + LOG.trace("end RequestDispatchObjectStoreChannelHandler channelRead0, " + + "ctx = {}, nettyReq = {}", ctx, nettyReq); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + super.exceptionCaught(ctx, cause); + IOUtils.cleanup(null, this.reqIn, this.reqOut, this.respIn, this.respOut); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java new file mode 100644 index 0000000000..f4aa675b67 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Netty-based HTTP server implementation for Ozone. + */ +@InterfaceAudience.Private +package org.apache.hadoop.ozone.web.netty; + +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java new file mode 100644 index 0000000000..75fad530cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import java.io.IOException; + +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.handlers.VolumeArgs; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.ListBuckets; +import org.apache.hadoop.ozone.web.response.ListVolumes; +import org.apache.hadoop.ozone.web.response.VolumeInfo; + +/** + * A {@link StorageHandler} implementation that distributes object storage + * across the nodes of an HDFS cluster. + */ +public final class DistributedStorageHandler implements StorageHandler { + + @Override + public void createVolume(VolumeArgs args) throws + IOException, OzoneException { + + } + + @Override + public void setVolumeOwner(VolumeArgs args) throws + IOException, OzoneException { + + } + + @Override + public void setVolumeQuota(VolumeArgs args, boolean remove) + throws IOException, OzoneException { + + } + + @Override + public boolean checkVolumeAccess(VolumeArgs args) + throws IOException, OzoneException { + return false; + } + + @Override + public ListVolumes listVolumes(UserArgs args) + throws IOException, OzoneException { + return null; + } + + @Override + public void deleteVolume(VolumeArgs args) + throws IOException, OzoneException { + + } + + @Override + public VolumeInfo getVolumeInfo(VolumeArgs args) + throws IOException, OzoneException { + return null; + } + + @Override + public void createBucket(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public void setBucketAcls(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public void setBucketVersioning(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public void setBucketStorageClass(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public void deleteBucket(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public void checkBucketAccess(BucketArgs args) + throws IOException, OzoneException { + + } + + @Override + public ListBuckets listBuckets(VolumeArgs args) + throws IOException, OzoneException { + return null; + } + + @Override + public BucketInfo getBucketInfo(BucketArgs args) + throws IOException, OzoneException { + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/package-info.java new file mode 100644 index 0000000000..f5499f5b33 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Ozone storage handler implementation integrating REST interface front-end + * with container data pipeline back-end. + */ +@InterfaceAudience.Private +package org.apache.hadoop.ozone.web.storage; + +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider new file mode 100644 index 0000000000..2e103fea7b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainerProvider diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneWebAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneWebAccess.java new file mode 100644 index 0000000000..c1e876337e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneWebAccess.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web; + +import static java.net.HttpURLConnection.HTTP_CREATED; +import static org.apache.hadoop.ozone.web.utils.OzoneUtils.*; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import javax.ws.rs.core.HttpHeaders; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.web.headers.Header; +import org.apache.hadoop.ozone.web.utils.OzoneConsts; +import org.apache.hadoop.util.Time; + +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.DefaultHttpClient; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class TestOzoneWebAccess { + + private static MiniDFSCluster cluster; + private static int port; + + @Rule + public Timeout timeout = new Timeout(30000); + + /** + * Create a MiniDFSCluster for testing. + * + * Ozone is made active by setting DFS_OBJECTSTORE_ENABLED_KEY = true and + * DFS_STORAGE_HANDLER_TYPE_KEY = "local" , which uses a local directory to + * emulate Ozone backend. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + + String path = new Path( + System.getProperty("test.build.data", "target/test/data"), + TestOzoneWebAccess.class.getSimpleName()).toUri().getPath(); + conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local"); + + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DataNode dataNode = cluster.getDataNodes().get(0); + port = dataNode.getInfoPort(); + } + + /** + * shutdown MiniDFSCluster + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Send a vaild Ozone Request. + * + * @throws IOException + */ + @Test + public void testOzoneRequest() throws IOException { + SimpleDateFormat format = + new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); + HttpClient client = new DefaultHttpClient(); + String volumeName = getRequestID().toLowerCase(Locale.US); + try { + HttpPost httppost = new HttpPost( + String.format("http://localhost:%d/%s", port, volumeName)); + + httppost.addHeader(Header.OZONE_VERSION_HEADER, + Header.OZONE_V1_VERSION_HEADER); + httppost.addHeader(HttpHeaders.DATE, + format.format(new Date(Time.now()))); + httppost.addHeader(HttpHeaders.AUTHORIZATION, + Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " + + OzoneConsts.OZONE_SIMPLE_HDFS_USER); + httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER); + + HttpResponse response = client.execute(httppost); + assertEquals(response.toString(), HTTP_CREATED, + response.getStatusLine().getStatusCode()); + } finally { + client.getConnectionManager().shutdown(); + } + } +}