From 7b7d186b8235e5ffbd49bc982210c0c9fa3b8632 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 9 Mar 2017 11:03:20 -0800 Subject: [PATCH] HDFS-11513. Ozone: Separate XceiverServer and XceiverClient into interfaces and implementations. --- .../org/apache/hadoop/scm/XceiverClient.java | 24 ++------ .../hadoop/scm/XceiverClientManager.java | 12 ++-- .../apache/hadoop/scm/XceiverClientSpi.java | 56 +++++++++++++++++++ .../scm/client/ContainerOperationClient.java | 4 +- .../hadoop/scm/storage/ChunkInputStream.java | 6 +- .../hadoop/scm/storage/ChunkOutputStream.java | 6 +- .../scm/storage/ContainerProtocolCalls.java | 16 +++--- .../transport/server/XceiverServer.java | 14 +---- .../transport/server/XceiverServerSpi.java | 30 ++++++++++ .../container/ozoneimpl/OzoneContainer.java | 3 +- .../storage/DistributedStorageHandler.java | 49 +++++++--------- .../ozone/scm/TestContainerSmallFile.java | 8 +-- 12 files changed, 140 insertions(+), 88 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index e1a1a8bb56..c6e47c899e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -33,14 +33,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * A Client for the storageContainer protocol. */ -public class XceiverClient implements Closeable { +public class XceiverClient implements XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); private final Pipeline pipeline; private final Configuration config; @@ -61,9 +60,7 @@ public XceiverClient(Pipeline pipeline, Configuration config) { this.config = config; } - /** - * Connects to the leader in the pipeline. - */ + @Override public void connect() throws Exception { if (channelFuture != null && channelFuture.channel() != null @@ -90,9 +87,6 @@ public void connect() throws Exception { channelFuture = b.connect(leader.getHostName(), port).sync(); } - /** - * Close the client. - */ @Override public void close() { if(group != null) { @@ -104,22 +98,12 @@ public void close() { } } - /** - * Returns the pipeline of machines that host the container used by this - * client. - * - * @return pipeline of machines that host the container - */ + @Override public Pipeline getPipeline() { return pipeline; } - /** - * Sends a given command to server and gets the reply back. - * @param request Request - * @return Response to the command - * @throws IOException - */ + @Override public ContainerProtos.ContainerCommandResponseProto sendCommand( ContainerProtos.ContainerCommandRequestProto request) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index de706cb4b6..82e7e2aa90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -96,7 +96,7 @@ public void onRemoval( * @return XceiverClient connected to a container * @throws IOException if an XceiverClient cannot be acquired */ - public XceiverClient acquireClient(Pipeline pipeline) throws IOException { + public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); @@ -109,7 +109,7 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException { return info.getXceiverClient(); } else { // connection not found, create new, add reference and return - XceiverClient xceiverClient = new XceiverClient(pipeline, conf); + XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf); try { xceiverClient.connect(); } catch (Exception e) { @@ -129,7 +129,7 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException { * * @param xceiverClient client to release */ - public void releaseClient(XceiverClient xceiverClient) { + public void releaseClient(XceiverClientSpi xceiverClient) { Preconditions.checkNotNull(xceiverClient); String containerName = xceiverClient.getPipeline().getContainerName(); XceiverClientWithAccessInfo info; @@ -147,10 +147,10 @@ public void releaseClient(XceiverClient xceiverClient) { * - a reference count, +1 when acquire, -1 when release */ private static class XceiverClientWithAccessInfo { - final private XceiverClient xceiverClient; + final private XceiverClientSpi xceiverClient; final private AtomicInteger referenceCount; - XceiverClientWithAccessInfo(XceiverClient xceiverClient) { + XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) { this.xceiverClient = xceiverClient; this.referenceCount = new AtomicInteger(0); } @@ -167,7 +167,7 @@ boolean hasRefence() { return this.referenceCount.get() != 0; } - XceiverClient getXceiverClient() { + XceiverClientSpi getXceiverClient() { return xceiverClient; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java new file mode 100644 index 0000000000..1cf5a2859c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A Client for the storageContainer protocol. + */ +public interface XceiverClientSpi extends Closeable { + /** + * Connects to the leader in the pipeline. + */ + void connect() throws Exception; + + @Override + void close(); + + /** + * Returns the pipeline of machines that host the container used by this + * client. + * + * @return pipeline of machines that host the container + */ + Pipeline getPipeline(); + + /** + * Sends a given command to server and gets the reply back. + * @param request Request + * @return Response to the command + * @throws IOException + */ + ContainerCommandResponseProto sendCommand( + ContainerCommandRequestProto request) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java index 5790ad688a..aa76b188c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.scm.client; +import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.storage.ContainerProtocolCalls; @@ -75,7 +75,7 @@ public static void setContainerSizeB(long size) { @Override public Pipeline createContainer(String containerId) throws IOException { - XceiverClient client = null; + XceiverClientSpi client = null; try { Pipeline pipeline = storageContainerLocationClient.allocateContainer(containerId); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java index 1206ecd2eb..8ca008ca44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientManager; /** @@ -47,7 +47,7 @@ public class ChunkInputStream extends InputStream { private final String key; private final String traceID; private XceiverClientManager xceiverClientManager; - private XceiverClient xceiverClient; + private XceiverClientSpi xceiverClient; private List chunks; private int chunkOffset; private List buffers; @@ -63,7 +63,7 @@ public class ChunkInputStream extends InputStream { * @param traceID container protocol call traceID */ public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, - XceiverClient xceiverClient, List chunks, String traceID) { + XceiverClientSpi xceiverClient, List chunks, String traceID) { this.key = key; this.traceID = traceID; this.xceiverClientManager = xceiverClientManager; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 0126e582fa..3e9a3d4db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; /** * An {@link OutputStream} used by the REST service in combination with the @@ -58,7 +58,7 @@ public class ChunkOutputStream extends OutputStream { private final String traceID; private final KeyData.Builder containerKeyData; private XceiverClientManager xceiverClientManager; - private XceiverClient xceiverClient; + private XceiverClientSpi xceiverClient; private ByteBuffer buffer; private final String streamId; private int chunkIndex; @@ -73,7 +73,7 @@ public class ChunkOutputStream extends OutputStream { * @param traceID container protocol call args */ public ChunkOutputStream(String containerKey, String key, - XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, + XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String traceID) { this.containerKey = containerKey; this.key = key; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index d2c944b451..f345aa884b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos .WriteChunkRequestProto; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import java.io.IOException; +import org.apache.hadoop.scm.XceiverClientSpi; /** * Implementation of all container protocol calls performed by Container @@ -71,7 +71,7 @@ private ContainerProtocolCalls() { * @return container protocol get key response * @throws IOException if there is an I/O error while performing the call */ - public static GetKeyResponseProto getKey(XceiverClient xceiverClient, + public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient, KeyData containerKeyData, String traceID) throws IOException { GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto .newBuilder() @@ -96,7 +96,7 @@ public static GetKeyResponseProto getKey(XceiverClient xceiverClient, * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static void putKey(XceiverClient xceiverClient, + public static void putKey(XceiverClientSpi xceiverClient, KeyData containerKeyData, String traceID) throws IOException { PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto .newBuilder() @@ -122,7 +122,7 @@ public static void putKey(XceiverClient xceiverClient, * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, + public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, String traceID) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto @@ -151,7 +151,7 @@ public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, + public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, ByteString data, String traceID) throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto @@ -183,7 +183,7 @@ public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, * @param traceID - Trace ID for logging purpose. * @throws IOException */ - public static void writeSmallFile(XceiverClient client, String containerName, + public static void writeSmallFile(XceiverClientSpi client, String containerName, String key, byte[] data, String traceID) throws IOException { KeyData containerKeyData = KeyData @@ -224,7 +224,7 @@ public static void writeSmallFile(XceiverClient client, String containerName, * @param traceID - traceID * @throws IOException */ - public static void createContainer(XceiverClient client, String traceID) + public static void createContainer(XceiverClientSpi client, String traceID) throws IOException { ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto @@ -255,7 +255,7 @@ public static void createContainer(XceiverClient client, String traceID) * @return GetSmallFileResponseProto * @throws IOException */ - public static GetSmallFileResponseProto readSmallFile(XceiverClient client, + public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, String containerName, String key, String traceID) throws IOException { KeyData containerKeyData = KeyData .newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 6f765e2de4..264ba4afe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -36,7 +36,7 @@ * Creates a netty server endpoint that acts as the communication layer for * Ozone containers. */ -public final class XceiverServer { +public final class XceiverServer implements XceiverServerSpi { private final int port; private final ContainerDispatcher storageContainer; @@ -57,11 +57,7 @@ public XceiverServer(Configuration conf, this.storageContainer = dispatcher; } - /** - * Starts running the server. - * - * @throws IOException - */ + @Override public void start() throws IOException { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); @@ -75,11 +71,7 @@ public void start() throws IOException { .channel(); } - /** - * Stops a running server. - * - * @throws Exception - */ + @Override public void stop() { if (bossGroup != null) { bossGroup.shutdownGracefully(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java new file mode 100644 index 0000000000..512f8fcc1b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import java.io.IOException; + +/** A server endpoint that acts as the communication layer for Ozone containers. */ +public interface XceiverServerSpi { + /** Starts the server. */ + void start() throws IOException; + + /** Stops a running server. */ + void stop(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 0f77175c9c..e251da1577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public class OzoneContainer { private final Configuration ozoneConfig; private final ContainerDispatcher dispatcher; private final ContainerManager manager; - private final XceiverServer server; + private final XceiverServerSpi server; private final ChunkManager chunkManager; private final KeyManager keyManager; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 8480a885a7..ef0dc18544 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -18,20 +18,6 @@ package org.apache.hadoop.ozone.web.storage; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*; -import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; - -import java.io.IOException; -import java.io.OutputStream; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import java.util.TimeZone; - import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; @@ -40,7 +26,6 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; @@ -50,17 +35,21 @@ import org.apache.hadoop.ozone.web.handlers.ListArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; -import org.apache.hadoop.ozone.web.response.BucketInfo; -import org.apache.hadoop.ozone.web.response.KeyInfo; -import org.apache.hadoop.ozone.web.response.ListBuckets; -import org.apache.hadoop.ozone.web.response.ListKeys; -import org.apache.hadoop.ozone.web.response.ListVolumes; -import org.apache.hadoop.ozone.web.response.VolumeInfo; -import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.ozone.web.response.*; +import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.*; + +import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; + /** * A {@link StorageHandler} implementation that distributes object storage * across the nodes of an HDFS cluster. @@ -87,7 +76,7 @@ public DistributedStorageHandler(OzoneConfiguration conf, @Override public void createVolume(VolumeArgs args) throws IOException, OzoneException { String containerKey = buildContainerKey(args.getVolumeName()); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); try { VolumeInfo volume = new VolumeInfo(); volume.setVolumeName(args.getVolumeName()); @@ -137,7 +126,7 @@ public void deleteVolume(VolumeArgs args) public VolumeInfo getVolumeInfo(VolumeArgs args) throws IOException, OzoneException { String containerKey = buildContainerKey(args.getVolumeName()); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); try { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); @@ -155,7 +144,7 @@ public void createBucket(final BucketArgs args) throws IOException, OzoneException { String containerKey = buildContainerKey(args.getVolumeName(), args.getBucketName()); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); try { BucketInfo bucket = new BucketInfo(); bucket.setVolumeName(args.getVolumeName()); @@ -215,7 +204,7 @@ public BucketInfo getBucketInfo(BucketArgs args) throws IOException, OzoneException { String containerKey = buildContainerKey(args.getVolumeName(), args.getBucketName()); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); try { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); @@ -236,7 +225,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, KeyInfo key = new KeyInfo(); key.setKeyName(args.getKeyName()); key.setCreatedOn(dateToString(new Date())); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); return new ChunkOutputStream(containerKey, key.getKeyName(), xceiverClientManager, xceiverClient, args.getRequestID()); } @@ -252,7 +241,7 @@ public LengthInputStream newKeyReader(KeyArgs args) throws IOException, OzoneException { String containerKey = buildContainerKey(args.getVolumeName(), args.getBucketName(), args.getKeyName()); - XceiverClient xceiverClient = acquireXceiverClient(containerKey); + XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); boolean success = false; try { KeyData containerKeyData = containerKeyDataForRead( @@ -286,7 +275,7 @@ public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { } /** - * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes + * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} of nodes * capable of serving container protocol operations. The container is * selected based on the specified container key. * @@ -294,7 +283,7 @@ public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { * @return XceiverClient connected to a container * @throws IOException if an XceiverClient cannot be acquired */ - private XceiverClient acquireXceiverClient(String containerKey) + private XceiverClientSpi acquireXceiverClient(String containerKey) throws IOException { Set locatedContainers = storageContainerLocation.getStorageContainerLocations( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index f5871cdd10..a01edd12e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -23,8 +23,8 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.storage.ContainerProtocolCalls; @@ -75,7 +75,7 @@ public void testAllocateWrite() throws Exception { String containerName = "container0"; Pipeline pipeline = storageContainerLocationClient.allocateContainer(containerName); - XceiverClient client = xceiverClientManager.acquireClient(pipeline); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); ContainerProtocolCalls.createContainer(client, traceID); ContainerProtocolCalls.writeSmallFile(client, containerName, @@ -93,7 +93,7 @@ public void testInvalidKeyRead() throws Exception { String containerName = "container1"; Pipeline pipeline = storageContainerLocationClient.allocateContainer(containerName); - XceiverClient client = xceiverClientManager.acquireClient(pipeline); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); ContainerProtocolCalls.createContainer(client, traceID); thrown.expect(StorageContainerException.class); @@ -112,7 +112,7 @@ public void testInvalidContainerRead() throws Exception { String containerName = "container2"; Pipeline pipeline = storageContainerLocationClient.allocateContainer(containerName); - XceiverClient client = xceiverClientManager.acquireClient(pipeline); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); ContainerProtocolCalls.createContainer(client, traceID); ContainerProtocolCalls.writeSmallFile(client, containerName, "key", "data123".getBytes(), traceID);