HDFS-11126. Ozone: Add small file support RPC. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-11-18 10:32:46 -08:00 committed by Owen O'Malley
parent 05b44e1ad8
commit ca12aac5a4
6 changed files with 439 additions and 33 deletions

View File

@ -25,6 +25,7 @@
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@ -36,14 +37,23 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileRequestProto;
import org.apache.hadoop.scm.XceiverClient;
/**
* Implementation of all container protocol calls performed by
* .
* Implementation of all container protocol calls performed by Container
* clients.
*/
public final class ContainerProtocolCalls {
/**
* There is no need to instantiate this class.
*/
private ContainerProtocolCalls() {
}
/**
* Calls the container protocol to get a container key.
*
@ -152,6 +162,90 @@ public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
validateContainerResponse(response, traceID);
}
/**
* Allows writing a small file using single RPC. This takes the container
* name, key name and data to write sends all that data to the container using
* a single RPC. This API is designed to be used for files which are smaller
* than 1 MB.
*
* @param client - client that communicates with the container.
* @param containerName - Name of the container
* @param key - Name of the Key
* @param data - Data to be written into the container.
* @param traceID - Trace ID for logging purpose.
* @throws IOException
*/
public static void writeSmallFile(XceiverClient client, String containerName,
String key, byte[] data, String traceID) throws IOException {
KeyData containerKeyData = KeyData
.newBuilder()
.setContainerName(containerName)
.setName(key).build();
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
.newBuilder()
.setPipeline(client.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
ChunkInfo chunk = ChunkInfo
.newBuilder()
.setChunkName(key + "_chunk")
.setOffset(0)
.setLen(data.length)
.build();
PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto
.newBuilder().setChunkInfo(chunk)
.setKey(createKeyRequest)
.setData(ByteString.copyFrom(data))
.build();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.PutSmallFile)
.setTraceID(traceID)
.setPutSmallFile(putSmallFileRequest)
.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response, traceID);
}
/**
* Reads the data given the container name and key.
*
* @param client - client
* @param containerName - name of the container
* @param key - key
* @param traceID - trace ID
* @return GetSmallFileResponseProto
* @throws IOException
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClient client,
String containerName, String key, String traceID) throws IOException {
KeyData containerKeyData = KeyData
.newBuilder()
.setContainerName(containerName)
.setName(key).build();
GetKeyRequestProto.Builder getKey = GetKeyRequestProto
.newBuilder()
.setPipeline(client.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto
.newBuilder().setKey(getKey)
.build();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetSmallFile)
.setTraceID(traceID)
.setGetSmallFile(getSmallFileRequest)
.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response, traceID);
return response.getGetSmallFile();
}
/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
@ -181,10 +275,4 @@ private static void validateContainerResponse(
"Unrecognized container response:" + traceID);
}
}
/**
* There is no need to instantiate this class.
*/
private ContainerProtocolCalls() {
}
}

View File

@ -86,6 +86,11 @@ enum Type {
WriteChunk = 12;
ListChunk = 13;
CompactChunk = 14;
/** Combines Key and Chunk Operation into Single RPC. */
PutSmallFile = 15;
GetSmallFile = 16;
}
@ -122,6 +127,9 @@ message ContainerCommandRequestProto {
optional WriteChunkRequestProto writeChunk = 13;
optional DeleteChunkRequestProto deleteChunk = 14;
optional ListChunkRequestProto listChunk = 15;
optional PutSmallFileRequestProto putSmallFile = 16;
optional GetSmallFileRequestProto getSmallFile = 17;
}
message ContainerCommandResponseProto {
@ -147,6 +155,9 @@ message ContainerCommandResponseProto {
required Result result = 17;
optional string message = 18;
optional PutSmallFileResponseProto putSmallFile = 19;
optional GetSmallFileResponseProto getSmallFile = 20;
}
// A pipeline is composed of one or more datanodes that back a container.
@ -318,3 +329,24 @@ message ListChunkResponseProto {
repeated ChunkInfo chunkData = 1;
}
/** For small file access combines write chunk and putKey into a single
RPC */
message PutSmallFileRequestProto {
required PutKeyRequestProto key = 1;
required ChunkInfo chunkInfo = 2;
required bytes data = 3;
}
message PutSmallFileResponseProto {
}
message GetSmallFileRequestProto {
required GetKeyRequestProto key = 1;
}
message GetSmallFileResponseProto {
required ReadChunkResponseProto data = 1;
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.helpers;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
/**
* File Utils are helper routines used by putSmallFile and getSmallFile
* RPCs.
*/
public final class FileUtils {
/**
* Never Constructed.
*/
private FileUtils() {
}
/**
* Gets a response for the putSmallFile RPC.
* @param msg - ContainerCommandRequestProto
* @return - ContainerCommandResponseProto
*/
public static ContainerProtos.ContainerCommandResponseProto
getPutFileResponse(ContainerProtos.ContainerCommandRequestProto msg) {
ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
ContainerProtos.PutSmallFileResponseProto.newBuilder();
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
.SUCCESS, "");
builder.setCmdType(ContainerProtos.Type.PutSmallFile);
builder.setPutSmallFile(getResponse);
return builder.build();
}
/**
* Gets a response to the read small file call.
* @param msg - Msg
* @param data - Data
* @param info - Info
* @return Response.
*/
public static ContainerProtos.ContainerCommandResponseProto
getGetSmallFileResponse(ContainerProtos.ContainerCommandRequestProto msg,
byte[] data, ChunkInfo info) {
Preconditions.checkNotNull(msg);
ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
ContainerProtos.ReadChunkResponseProto.newBuilder();
readChunkresponse.setChunkData(info.getProtoBufMessage());
readChunkresponse.setData(ByteString.copyFrom(data));
readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline());
ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
ContainerProtos.GetSmallFileResponseProto.newBuilder();
getSmallFile.setData(readChunkresponse.build());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
.SUCCESS, "");
builder.setCmdType(ContainerProtos.Type.GetSmallFile);
builder.setGetSmallFile(getSmallFile);
return builder.build();
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto;
@ -29,15 +30,18 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
@ -69,7 +73,6 @@ public ContainerCommandResponseProto dispatch(
(cmdType == Type.ReadContainer) ||
(cmdType == Type.ListContainer) ||
(cmdType == Type.UpdateContainer)) {
return containerProcessHandler(msg);
}
@ -86,6 +89,11 @@ public ContainerCommandResponseProto dispatch(
return chunkProcessHandler(msg);
}
if ((cmdType == Type.PutSmallFile) ||
(cmdType == Type.GetSmallFile)) {
return smallFileHandler(msg);
}
return ContainerUtils.unsupportedRequest(msg);
}
@ -219,6 +227,18 @@ private ContainerCommandResponseProto chunkProcessHandler(
}
}
private ContainerCommandResponseProto smallFileHandler(
ContainerCommandRequestProto msg) throws IOException {
switch (msg.getCmdType()) {
case PutSmallFile:
return handlePutSmallFile(msg);
case GetSmallFile:
return handleGetSmallFile(msg);
default:
return ContainerUtils.unsupportedRequest(msg);
}
}
/**
* Calls into container logic and returns appropriate response.
*
@ -387,7 +407,7 @@ private ContainerCommandResponseProto handleDeleteChunk(
*/
private ContainerCommandResponseProto handlePutKey(
ContainerCommandRequestProto msg) throws IOException {
if(!msg.hasPutKey()){
if (!msg.hasPutKey()) {
LOG.debug("Malformed put key request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
@ -409,7 +429,7 @@ private ContainerCommandResponseProto handlePutKey(
*/
private ContainerCommandResponseProto handleGetKey(
ContainerCommandRequestProto msg) throws IOException {
if(!msg.hasGetKey()){
if (!msg.hasGetKey()) {
LOG.debug("Malformed get key request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
@ -430,7 +450,7 @@ private ContainerCommandResponseProto handleGetKey(
*/
private ContainerCommandResponseProto handleDeleteKey(
ContainerCommandRequestProto msg) throws IOException {
if(!msg.hasDeleteKey()){
if (!msg.hasDeleteKey()) {
LOG.debug("Malformed delete key request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
@ -447,4 +467,73 @@ private ContainerCommandResponseProto handleDeleteKey(
return KeyUtils.getKeyResponse(msg);
}
/**
* Handles writing a chunk and associated key using single RPC.
*
* @param msg - Message.
* @return ContainerCommandResponseProto
* @throws IOException
*/
private ContainerCommandResponseProto handlePutSmallFile(
ContainerCommandRequestProto msg) throws IOException {
if (!msg.hasPutSmallFile()) {
LOG.debug("Malformed put small file request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
Pipeline pipeline =
Pipeline.getFromProtoBuf(msg.getPutSmallFile().getKey().getPipeline());
Preconditions.checkNotNull(pipeline);
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
.getKeyData());
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
.getChunkInfo());
byte[] data = msg.getPutSmallFile().getData().toByteArray();
this.containerManager.getChunkManager().writeChunk(pipeline, keyData
.getKeyName(), chunkInfo, data);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunks);
this.containerManager.getKeyManager().putKey(pipeline, keyData);
return FileUtils.getPutFileResponse(msg);
}
/**
* Handles getting a data stream using a key. This helps in reducing the RPC
* overhead for small files.
*
* @param msg - ContainerCommandRequestProto
* @return ContainerCommandResponseProto
*/
private ContainerCommandResponseProto handleGetSmallFile(
ContainerCommandRequestProto msg) throws IOException {
ByteString dataBuf = ByteString.EMPTY;
if (!msg.hasGetSmallFile()) {
LOG.debug("Malformed get small file request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
Pipeline pipeline =
Pipeline.getFromProtoBuf(msg.getGetSmallFile().getKey().getPipeline());
Preconditions.checkNotNull(pipeline);
KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
.getKey().getKeyData());
KeyData data = this.containerManager.getKeyManager().getKey(keyData);
ContainerProtos.ChunkInfo c = null;
for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
ByteString current =
ByteString.copyFrom(this.containerManager.getChunkManager().readChunk(
pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
chunk)));
dataBuf = dataBuf.concat(current);
c = chunk;
}
return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
ChunkInfo.getFromProtoBuf(c));
}
}

View File

@ -72,12 +72,12 @@ public static Pipeline createSingleNodePipeline(String containerName) throws
* Creates a ChunkInfo for testing.
*
* @param keyName - Name of the key
* @param seqNo - Chunk number.
* @param seqNo - Chunk number.
* @return ChunkInfo
* @throws IOException
*/
public static ChunkInfo getChunk(String keyName, int seqNo, long offset,
long len) throws IOException {
long len) throws IOException {
ChunkInfo info = new ChunkInfo(String.format("%s.data.%d", keyName,
seqNo), offset, len);
@ -113,17 +113,17 @@ public static void setDataChecksum(ChunkInfo info, byte[] data)
/**
* Returns a writeChunk Request.
*
* @param containerName - Name
* @param keyName - Name
* @param datalen - data len.
* @return Request.
* @param pipeline - A set of machines where this container lives.
* @param containerName - Name of the container.
* @param keyName - Name of the Key this chunk is part of.
* @param datalen - Length of data.
* @return ContainerCommandRequestProto
* @throws IOException
* @throws NoSuchAlgorithmException
*/
public static ContainerCommandRequestProto getWriteChunkRequest(
Pipeline pipeline, String containerName, String keyName, int datalen)
throws
IOException, NoSuchAlgorithmException {
throws IOException, NoSuchAlgorithmException {
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto
.newBuilder();
@ -146,6 +146,65 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
return request.build();
}
/**
* Returns PutSmallFile Request that we can send to the container.
*
* @param pipeline - Pipeline
* @param containerName - ContainerName.
* @param keyName - KeyName
* @param dataLen - Number of bytes in the data
* @return ContainerCommandRequestProto
*/
public static ContainerCommandRequestProto getWriteSmallFileRequest(
Pipeline pipeline, String containerName, String keyName, int dataLen)
throws Exception {
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.PutSmallFileRequestProto.newBuilder();
pipeline.setContainerName(containerName);
byte[] data = getData(dataLen);
ChunkInfo info = getChunk(keyName, 0, 0, dataLen);
setDataChecksum(info, data);
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
putRequest.setPipeline(pipeline.getProtobufMessage());
KeyData keyData = new KeyData(containerName, keyName);
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(info.getProtoBufMessage());
keyData.setChunks(newList);
putRequest.setKeyData(keyData.getProtoBufMessage());
smallFileRequest.setChunkInfo(info.getProtoBufMessage());
smallFileRequest.setData(ByteString.copyFrom(data));
smallFileRequest.setKey(putRequest);
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutSmallFile);
request.setPutSmallFile(smallFileRequest);
return request.build();
}
public static ContainerCommandRequestProto getReadSmallFileRequest(
ContainerProtos.PutKeyRequestProto putKey)
throws Exception {
ContainerProtos.GetSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.GetSmallFileRequestProto.newBuilder();
ContainerCommandRequestProto getKey = getKeyRequest(putKey);
smallFileRequest.setKey(getKey.getGetKey());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.GetSmallFile);
request.setGetSmallFile(smallFileRequest);
return request.build();
}
/**
* Returns a read Request.
*
@ -156,8 +215,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
*/
public static ContainerCommandRequestProto getReadChunkRequest(
ContainerProtos.WriteChunkRequestProto request)
throws
IOException, NoSuchAlgorithmException {
throws IOException, NoSuchAlgorithmException {
ContainerProtos.ReadChunkRequestProto.Builder readRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder();
@ -298,23 +356,22 @@ public static ContainerCommandRequestProto getKeyRequest(
}
/**
* Verify the response against the request.
* @param request - Request
* @param response - Response
* Verify the response against the request.
*
* @param request - Request
* @param response - Response
*/
public static void verifyGetKey(ContainerCommandRequestProto request,
ContainerCommandResponseProto response) {
ContainerCommandResponseProto response) {
Assert.assertEquals(request.getTraceID(), response.getTraceID());
Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
ContainerProtos.PutKeyRequestProto putKey = request.getPutKey();
ContainerProtos. GetKeyRequestProto getKey = request.getGetKey();
ContainerProtos.GetKeyRequestProto getKey = request.getGetKey();
Assert.assertEquals(putKey.getKeyData().getChunksCount(),
getKey.getKeyData().getChunksCount());
getKey.getKeyData().getChunksCount());
}
/**
*
* @param putKeyRequest - putKeyRequest.
* @return - Request
*/

View File

@ -17,14 +17,15 @@
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -168,4 +169,62 @@ public void testOzoneContainerViaDataNode() throws Exception {
}
@Test
public void testBothGetandPutSmallFile() throws Exception {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local").build();
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto smallFileRequest =
ContainerTestHelper.getWriteSmallFileRequest(pipeline, containerName,
keyName, 1024);
response = client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto getSmallFileRequest =
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest
.getPutSmallFile().getKey());
response = client.sendCommand(getSmallFileRequest);
Assert.assertArrayEquals(
smallFileRequest.getPutSmallFile().getData().toByteArray(),
response.getGetSmallFile().getData().getData().toByteArray());
cluster.shutdown();
}
}