HDFS-11513. Ozone: Separate XceiverServer and XceiverClient into interfaces and implementations.

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-03-09 11:03:20 -08:00 committed by Owen O'Malley
parent 1058aa2523
commit 7b7d186b82
12 changed files with 140 additions and 88 deletions

View File

@ -33,14 +33,13 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
*/ */
public class XceiverClient implements Closeable { public class XceiverClient implements XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline; private final Pipeline pipeline;
private final Configuration config; private final Configuration config;
@ -61,9 +60,7 @@ public XceiverClient(Pipeline pipeline, Configuration config) {
this.config = config; this.config = config;
} }
/** @Override
* Connects to the leader in the pipeline.
*/
public void connect() throws Exception { public void connect() throws Exception {
if (channelFuture != null if (channelFuture != null
&& channelFuture.channel() != null && channelFuture.channel() != null
@ -90,9 +87,6 @@ public void connect() throws Exception {
channelFuture = b.connect(leader.getHostName(), port).sync(); channelFuture = b.connect(leader.getHostName(), port).sync();
} }
/**
* Close the client.
*/
@Override @Override
public void close() { public void close() {
if(group != null) { if(group != null) {
@ -104,22 +98,12 @@ public void close() {
} }
} }
/** @Override
* Returns the pipeline of machines that host the container used by this
* client.
*
* @return pipeline of machines that host the container
*/
public Pipeline getPipeline() { public Pipeline getPipeline() {
return pipeline; return pipeline;
} }
/** @Override
* Sends a given command to server and gets the reply back.
* @param request Request
* @return Response to the command
* @throws IOException
*/
public ContainerProtos.ContainerCommandResponseProto sendCommand( public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request) ContainerProtos.ContainerCommandRequestProto request)
throws IOException { throws IOException {

View File

@ -96,7 +96,7 @@ public void onRemoval(
* @return XceiverClient connected to a container * @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired * @throws IOException if an XceiverClient cannot be acquired
*/ */
public XceiverClient acquireClient(Pipeline pipeline) throws IOException { public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
@ -109,7 +109,7 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
return info.getXceiverClient(); return info.getXceiverClient();
} else { } else {
// connection not found, create new, add reference and return // connection not found, create new, add reference and return
XceiverClient xceiverClient = new XceiverClient(pipeline, conf); XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf);
try { try {
xceiverClient.connect(); xceiverClient.connect();
} catch (Exception e) { } catch (Exception e) {
@ -129,7 +129,7 @@ public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
* *
* @param xceiverClient client to release * @param xceiverClient client to release
*/ */
public void releaseClient(XceiverClient xceiverClient) { public void releaseClient(XceiverClientSpi xceiverClient) {
Preconditions.checkNotNull(xceiverClient); Preconditions.checkNotNull(xceiverClient);
String containerName = xceiverClient.getPipeline().getContainerName(); String containerName = xceiverClient.getPipeline().getContainerName();
XceiverClientWithAccessInfo info; XceiverClientWithAccessInfo info;
@ -147,10 +147,10 @@ public void releaseClient(XceiverClient xceiverClient) {
* - a reference count, +1 when acquire, -1 when release * - a reference count, +1 when acquire, -1 when release
*/ */
private static class XceiverClientWithAccessInfo { private static class XceiverClientWithAccessInfo {
final private XceiverClient xceiverClient; final private XceiverClientSpi xceiverClient;
final private AtomicInteger referenceCount; final private AtomicInteger referenceCount;
XceiverClientWithAccessInfo(XceiverClient xceiverClient) { XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient; this.xceiverClient = xceiverClient;
this.referenceCount = new AtomicInteger(0); this.referenceCount = new AtomicInteger(0);
} }
@ -167,7 +167,7 @@ boolean hasRefence() {
return this.referenceCount.get() != 0; return this.referenceCount.get() != 0;
} }
XceiverClient getXceiverClient() { XceiverClientSpi getXceiverClient() {
return xceiverClient; return xceiverClient;
} }
} }

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
/**
* A Client for the storageContainer protocol.
*/
public interface XceiverClientSpi extends Closeable {
/**
* Connects to the leader in the pipeline.
*/
void connect() throws Exception;
@Override
void close();
/**
* Returns the pipeline of machines that host the container used by this
* client.
*
* @return pipeline of machines that host the container
*/
Pipeline getPipeline();
/**
* Sends a given command to server and gets the reply back.
* @param request Request
* @return Response to the command
* @throws IOException
*/
ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException;
}

View File

@ -17,8 +17,8 @@
*/ */
package org.apache.hadoop.scm.client; package org.apache.hadoop.scm.client;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
@ -75,7 +75,7 @@ public static void setContainerSizeB(long size) {
@Override @Override
public Pipeline createContainer(String containerId) public Pipeline createContainer(String containerId)
throws IOException { throws IOException {
XceiverClient client = null; XceiverClientSpi client = null;
try { try {
Pipeline pipeline = Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerId); storageContainerLocationClient.allocateContainer(containerId);

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
/** /**
@ -47,7 +47,7 @@ public class ChunkInputStream extends InputStream {
private final String key; private final String key;
private final String traceID; private final String traceID;
private XceiverClientManager xceiverClientManager; private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient; private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks; private List<ChunkInfo> chunks;
private int chunkOffset; private int chunkOffset;
private List<ByteBuffer> buffers; private List<ByteBuffer> buffers;
@ -63,7 +63,7 @@ public class ChunkInputStream extends InputStream {
* @param traceID container protocol call traceID * @param traceID container protocol call traceID
*/ */
public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
XceiverClient xceiverClient, List<ChunkInfo> chunks, String traceID) { XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
this.key = key; this.key = key;
this.traceID = traceID; this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;

View File

@ -32,8 +32,8 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
/** /**
* An {@link OutputStream} used by the REST service in combination with the * An {@link OutputStream} used by the REST service in combination with the
@ -58,7 +58,7 @@ public class ChunkOutputStream extends OutputStream {
private final String traceID; private final String traceID;
private final KeyData.Builder containerKeyData; private final KeyData.Builder containerKeyData;
private XceiverClientManager xceiverClientManager; private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient; private XceiverClientSpi xceiverClient;
private ByteBuffer buffer; private ByteBuffer buffer;
private final String streamId; private final String streamId;
private int chunkIndex; private int chunkIndex;
@ -73,7 +73,7 @@ public class ChunkOutputStream extends OutputStream {
* @param traceID container protocol call args * @param traceID container protocol call args
*/ */
public ChunkOutputStream(String containerKey, String key, public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String traceID) { String traceID) {
this.containerKey = containerKey; this.containerKey = containerKey;
this.key = key; this.key = key;

View File

@ -45,10 +45,10 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.WriteChunkRequestProto; .WriteChunkRequestProto;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.scm.XceiverClientSpi;
/** /**
* Implementation of all container protocol calls performed by Container * Implementation of all container protocol calls performed by Container
@ -71,7 +71,7 @@ private ContainerProtocolCalls() {
* @return container protocol get key response * @return container protocol get key response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static GetKeyResponseProto getKey(XceiverClient xceiverClient, public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
KeyData containerKeyData, String traceID) throws IOException { KeyData containerKeyData, String traceID) throws IOException {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
.newBuilder() .newBuilder()
@ -96,7 +96,7 @@ public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
* @param traceID container protocol call args * @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static void putKey(XceiverClient xceiverClient, public static void putKey(XceiverClientSpi xceiverClient,
KeyData containerKeyData, String traceID) throws IOException { KeyData containerKeyData, String traceID) throws IOException {
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
.newBuilder() .newBuilder()
@ -122,7 +122,7 @@ public static void putKey(XceiverClient xceiverClient,
* @return container protocol read chunk response * @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
ChunkInfo chunk, String key, String traceID) ChunkInfo chunk, String key, String traceID)
throws IOException { throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
@ -151,7 +151,7 @@ public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
* @param traceID container protocol call args * @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
String key, ByteString data, String traceID) String key, ByteString data, String traceID)
throws IOException { throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
@ -183,7 +183,7 @@ public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
* @param traceID - Trace ID for logging purpose. * @param traceID - Trace ID for logging purpose.
* @throws IOException * @throws IOException
*/ */
public static void writeSmallFile(XceiverClient client, String containerName, public static void writeSmallFile(XceiverClientSpi client, String containerName,
String key, byte[] data, String traceID) throws IOException { String key, byte[] data, String traceID) throws IOException {
KeyData containerKeyData = KeyData KeyData containerKeyData = KeyData
@ -224,7 +224,7 @@ public static void writeSmallFile(XceiverClient client, String containerName,
* @param traceID - traceID * @param traceID - traceID
* @throws IOException * @throws IOException
*/ */
public static void createContainer(XceiverClient client, String traceID) public static void createContainer(XceiverClientSpi client, String traceID)
throws IOException { throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest = ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto ContainerProtos.CreateContainerRequestProto
@ -255,7 +255,7 @@ public static void createContainer(XceiverClient client, String traceID)
* @return GetSmallFileResponseProto * @return GetSmallFileResponseProto
* @throws IOException * @throws IOException
*/ */
public static GetSmallFileResponseProto readSmallFile(XceiverClient client, public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
String containerName, String key, String traceID) throws IOException { String containerName, String key, String traceID) throws IOException {
KeyData containerKeyData = KeyData KeyData containerKeyData = KeyData
.newBuilder() .newBuilder()

View File

@ -36,7 +36,7 @@
* Creates a netty server endpoint that acts as the communication layer for * Creates a netty server endpoint that acts as the communication layer for
* Ozone containers. * Ozone containers.
*/ */
public final class XceiverServer { public final class XceiverServer implements XceiverServerSpi {
private final int port; private final int port;
private final ContainerDispatcher storageContainer; private final ContainerDispatcher storageContainer;
@ -57,11 +57,7 @@ public XceiverServer(Configuration conf,
this.storageContainer = dispatcher; this.storageContainer = dispatcher;
} }
/** @Override
* Starts running the server.
*
* @throws IOException
*/
public void start() throws IOException { public void start() throws IOException {
bossGroup = new NioEventLoopGroup(); bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup();
@ -75,11 +71,7 @@ public void start() throws IOException {
.channel(); .channel();
} }
/** @Override
* Stops a running server.
*
* @throws Exception
*/
public void stop() { public void stop() {
if (bossGroup != null) { if (bossGroup != null) {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import java.io.IOException;
/** A server endpoint that acts as the communication layer for Ozone containers. */
public interface XceiverServerSpi {
/** Starts the server. */
void start() throws IOException;
/** Stops a running server. */
void stop();
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport; .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,7 +52,7 @@ public class OzoneContainer {
private final Configuration ozoneConfig; private final Configuration ozoneConfig;
private final ContainerDispatcher dispatcher; private final ContainerDispatcher dispatcher;
private final ContainerManager manager; private final ContainerManager manager;
private final XceiverServer server; private final XceiverServerSpi server;
private final ChunkManager chunkManager; private final ChunkManager chunkManager;
private final KeyManager keyManager; private final KeyManager keyManager;

View File

@ -18,20 +18,6 @@
package org.apache.hadoop.ozone.web.storage; package org.apache.hadoop.ozone.web.storage;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TimeZone;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
@ -40,7 +26,6 @@
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
@ -50,17 +35,21 @@
import org.apache.hadoop.ozone.web.handlers.ListArgs; import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.BucketInfo; import org.apache.hadoop.ozone.web.response.*;
import org.apache.hadoop.ozone.web.response.KeyInfo; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
/** /**
* A {@link StorageHandler} implementation that distributes object storage * A {@link StorageHandler} implementation that distributes object storage
* across the nodes of an HDFS cluster. * across the nodes of an HDFS cluster.
@ -87,7 +76,7 @@ public DistributedStorageHandler(OzoneConfiguration conf,
@Override @Override
public void createVolume(VolumeArgs args) throws IOException, OzoneException { public void createVolume(VolumeArgs args) throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName()); String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try { try {
VolumeInfo volume = new VolumeInfo(); VolumeInfo volume = new VolumeInfo();
volume.setVolumeName(args.getVolumeName()); volume.setVolumeName(args.getVolumeName());
@ -137,7 +126,7 @@ public void deleteVolume(VolumeArgs args)
public VolumeInfo getVolumeInfo(VolumeArgs args) public VolumeInfo getVolumeInfo(VolumeArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName()); String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try { try {
KeyData containerKeyData = containerKeyDataForRead( KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey); xceiverClient.getPipeline().getContainerName(), containerKey);
@ -155,7 +144,7 @@ public void createBucket(final BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(), String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName()); args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try { try {
BucketInfo bucket = new BucketInfo(); BucketInfo bucket = new BucketInfo();
bucket.setVolumeName(args.getVolumeName()); bucket.setVolumeName(args.getVolumeName());
@ -215,7 +204,7 @@ public BucketInfo getBucketInfo(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(), String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName()); args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try { try {
KeyData containerKeyData = containerKeyDataForRead( KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey); xceiverClient.getPipeline().getContainerName(), containerKey);
@ -236,7 +225,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
KeyInfo key = new KeyInfo(); KeyInfo key = new KeyInfo();
key.setKeyName(args.getKeyName()); key.setKeyName(args.getKeyName());
key.setCreatedOn(dateToString(new Date())); key.setCreatedOn(dateToString(new Date()));
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
return new ChunkOutputStream(containerKey, key.getKeyName(), return new ChunkOutputStream(containerKey, key.getKeyName(),
xceiverClientManager, xceiverClient, args.getRequestID()); xceiverClientManager, xceiverClient, args.getRequestID());
} }
@ -252,7 +241,7 @@ public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
OzoneException { OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(), String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName()); args.getBucketName(), args.getKeyName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
boolean success = false; boolean success = false;
try { try {
KeyData containerKeyData = containerKeyDataForRead( KeyData containerKeyData = containerKeyDataForRead(
@ -286,7 +275,7 @@ public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
} }
/** /**
* Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} of nodes
* capable of serving container protocol operations. The container is * capable of serving container protocol operations. The container is
* selected based on the specified container key. * selected based on the specified container key.
* *
@ -294,7 +283,7 @@ public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
* @return XceiverClient connected to a container * @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired * @throws IOException if an XceiverClient cannot be acquired
*/ */
private XceiverClient acquireXceiverClient(String containerKey) private XceiverClientSpi acquireXceiverClient(String containerKey)
throws IOException { throws IOException {
Set<LocatedContainer> locatedContainers = Set<LocatedContainer> locatedContainers =
storageContainerLocation.getStorageContainerLocations( storageContainerLocation.getStorageContainerLocations(

View File

@ -23,8 +23,8 @@
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.protocolPB import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB; .StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
@ -75,7 +75,7 @@ public void testAllocateWrite() throws Exception {
String containerName = "container0"; String containerName = "container0";
Pipeline pipeline = Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName); storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline); XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID); ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.writeSmallFile(client, containerName, ContainerProtocolCalls.writeSmallFile(client, containerName,
@ -93,7 +93,7 @@ public void testInvalidKeyRead() throws Exception {
String containerName = "container1"; String containerName = "container1";
Pipeline pipeline = Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName); storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline); XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID); ContainerProtocolCalls.createContainer(client, traceID);
thrown.expect(StorageContainerException.class); thrown.expect(StorageContainerException.class);
@ -112,7 +112,7 @@ public void testInvalidContainerRead() throws Exception {
String containerName = "container2"; String containerName = "container2";
Pipeline pipeline = Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName); storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline); XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID); ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.writeSmallFile(client, containerName, ContainerProtocolCalls.writeSmallFile(client, containerName,
"key", "data123".getBytes(), traceID); "key", "data123".getBytes(), traceID);