HDFS-12238. Ozone: Add valid trace ID check in sendCommandAsync. Contributed by Ajay Kumar.
This commit is contained in:
parent
f9bce29dcc
commit
3a661b7f82
@ -21,6 +21,7 @@
|
|||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
@ -124,6 +125,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|||||||
*/
|
*/
|
||||||
public CompletableFuture<ContainerCommandResponseProto>
|
public CompletableFuture<ContainerCommandResponseProto>
|
||||||
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) {
|
sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) {
|
||||||
|
|
||||||
|
// Throw an exception of request doesn't have traceId
|
||||||
|
if(StringUtils.isEmpty(request.getTraceID())) {
|
||||||
|
throw new IllegalArgumentException("Invalid trace ID");
|
||||||
|
}
|
||||||
|
|
||||||
CompletableFuture<ContainerCommandResponseProto> response =
|
CompletableFuture<ContainerCommandResponseProto> response =
|
||||||
new CompletableFuture<>();
|
new CompletableFuture<>();
|
||||||
|
|
||||||
|
@ -206,6 +206,8 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.WriteChunk);
|
request.setCmdType(ContainerProtos.Type.WriteChunk);
|
||||||
request.setWriteChunk(writeRequest);
|
request.setWriteChunk(writeRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
|
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,6 +268,7 @@ public static ContainerCommandRequestProto getReadSmallFileRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.GetSmallFile);
|
request.setCmdType(ContainerProtos.Type.GetSmallFile);
|
||||||
request.setGetSmallFile(smallFileRequest);
|
request.setGetSmallFile(smallFileRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -295,6 +298,7 @@ public static ContainerCommandRequestProto getReadChunkRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
|
newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
|
||||||
newRequest.setReadChunk(readRequest);
|
newRequest.setReadChunk(readRequest);
|
||||||
|
newRequest.setTraceID(UUID.randomUUID().toString());
|
||||||
return newRequest.build();
|
return newRequest.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,6 +329,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.DeleteChunk);
|
request.setCmdType(ContainerProtos.Type.DeleteChunk);
|
||||||
request.setDeleteChunk(deleteRequest);
|
request.setDeleteChunk(deleteRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -353,6 +358,8 @@ public static ContainerCommandRequestProto getCreateContainerRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.CreateContainer);
|
request.setCmdType(ContainerProtos.Type.CreateContainer);
|
||||||
request.setCreateContainer(createRequest);
|
request.setCreateContainer(createRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
|
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -391,6 +398,7 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.UpdateContainer);
|
request.setCmdType(ContainerProtos.Type.UpdateContainer);
|
||||||
request.setUpdateContainer(updateRequestBuilder.build());
|
request.setUpdateContainer(updateRequestBuilder.build());
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -439,6 +447,7 @@ public static ContainerCommandRequestProto getPutKeyRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.PutKey);
|
request.setCmdType(ContainerProtos.Type.PutKey);
|
||||||
request.setPutKey(putRequest);
|
request.setPutKey(putRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -466,6 +475,7 @@ public static ContainerCommandRequestProto getKeyRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.GetKey);
|
request.setCmdType(ContainerProtos.Type.GetKey);
|
||||||
request.setGetKey(getRequest);
|
request.setGetKey(getRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -502,6 +512,7 @@ public static ContainerCommandRequestProto getDeleteKeyRequest(
|
|||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
request.setCmdType(ContainerProtos.Type.DeleteKey);
|
request.setCmdType(ContainerProtos.Type.DeleteKey);
|
||||||
request.setDeleteKey(delRequest);
|
request.setDeleteKey(delRequest);
|
||||||
|
request.setTraceID(UUID.randomUUID().toString());
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -513,13 +524,33 @@ public static ContainerCommandRequestProto getDeleteKeyRequest(
|
|||||||
public static ContainerCommandRequestProto getCloseContainer(
|
public static ContainerCommandRequestProto getCloseContainer(
|
||||||
Pipeline pipeline) {
|
Pipeline pipeline) {
|
||||||
Preconditions.checkNotNull(pipeline);
|
Preconditions.checkNotNull(pipeline);
|
||||||
ContainerProtos.CloseContainerRequestProto closeReqeuest =
|
ContainerProtos.CloseContainerRequestProto closeRequest =
|
||||||
ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline(
|
ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline(
|
||||||
pipeline.getProtobufMessage()).build();
|
pipeline.getProtobufMessage()).build();
|
||||||
ContainerProtos.ContainerCommandRequestProto cmd =
|
ContainerProtos.ContainerCommandRequestProto cmd =
|
||||||
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
|
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
|
||||||
.Type.CloseContainer).setCloseContainer(closeReqeuest)
|
.Type.CloseContainer).setCloseContainer(closeRequest)
|
||||||
|
.setTraceID(UUID.randomUUID().toString())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
return cmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a simple request without traceId.
|
||||||
|
* @param pipeline - pipeline
|
||||||
|
* @return ContainerCommandRequestProto without traceId.
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getRequestWithoutTraceId(
|
||||||
|
Pipeline pipeline) {
|
||||||
|
Preconditions.checkNotNull(pipeline);
|
||||||
|
ContainerProtos.CloseContainerRequestProto closeRequest =
|
||||||
|
ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline(
|
||||||
|
pipeline.getProtobufMessage()).build();
|
||||||
|
ContainerProtos.ContainerCommandRequestProto cmd =
|
||||||
|
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
|
||||||
|
.Type.CloseContainer).setCloseContainer(closeRequest)
|
||||||
|
.build();
|
||||||
return cmd;
|
return cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -538,6 +569,7 @@ public static ContainerCommandRequestProto getDeleteContainer(
|
|||||||
return ContainerCommandRequestProto.newBuilder()
|
return ContainerCommandRequestProto.newBuilder()
|
||||||
.setCmdType(ContainerProtos.Type.DeleteContainer)
|
.setCmdType(ContainerProtos.Type.DeleteContainer)
|
||||||
.setDeleteContainer(deleteRequest)
|
.setDeleteContainer(deleteRequest)
|
||||||
|
.setTraceID(UUID.randomUUID().toString())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.scm.XceiverClient;
|
import org.apache.hadoop.scm.XceiverClient;
|
||||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -148,7 +149,8 @@ static void runTestOzoneContainerViaDataNode(
|
|||||||
response = client.sendCommand(putKeyRequest);
|
response = client.sendCommand(putKeyRequest);
|
||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert
|
||||||
|
.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
// Get Key
|
// Get Key
|
||||||
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
||||||
@ -298,7 +300,8 @@ public void testCloseContainer() throws Exception {
|
|||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
||||||
response.getResult());
|
response.getResult());
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(
|
||||||
|
writeChunkRequest.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
// Read chunk must work on a closed container.
|
// Read chunk must work on a closed container.
|
||||||
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
|
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
|
||||||
@ -314,7 +317,8 @@ public void testCloseContainer() throws Exception {
|
|||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
|
||||||
response.getResult());
|
response.getResult());
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert
|
||||||
|
.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
// Get key must work on the closed container.
|
// Get key must work on the closed container.
|
||||||
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
||||||
@ -477,6 +481,34 @@ public void testXcieverClientAsync() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidRequest() throws Exception {
|
||||||
|
MiniOzoneCluster cluster = null;
|
||||||
|
XceiverClient client;
|
||||||
|
ContainerProtos.ContainerCommandRequestProto request;
|
||||||
|
try {
|
||||||
|
OzoneConfiguration conf = newOzoneConfiguration();
|
||||||
|
|
||||||
|
client = createClientForTesting(conf);
|
||||||
|
cluster = new MiniOzoneCluster.Builder(conf)
|
||||||
|
.setRandomContainerPort(false)
|
||||||
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||||
|
client.connect();
|
||||||
|
|
||||||
|
// Send a request without traceId.
|
||||||
|
request = ContainerTestHelper
|
||||||
|
.getRequestWithoutTraceId(client.getPipeline());
|
||||||
|
client.sendCommand(request);
|
||||||
|
Assert.fail("IllegalArgumentException expected");
|
||||||
|
} catch(IllegalArgumentException iae){
|
||||||
|
GenericTestUtils.assertExceptionContains("Invalid trace ID", iae);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static XceiverClient createClientForTesting(OzoneConfiguration conf)
|
private static XceiverClient createClientForTesting(OzoneConfiguration conf)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
@ -518,4 +550,31 @@ private static void createContainerForTesting(XceiverClientSpi client,
|
|||||||
Assert.assertTrue(response.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(response.getTraceID().equals(response.getTraceID()));
|
||||||
return writeChunkRequest;
|
return writeChunkRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void runRequestWithoutTraceId(
|
||||||
|
String containerName, XceiverClientSpi client) throws Exception {
|
||||||
|
try {
|
||||||
|
client.connect();
|
||||||
|
|
||||||
|
createContainerForTesting(client, containerName);
|
||||||
|
|
||||||
|
String keyName = OzoneUtils.getRequestID();
|
||||||
|
final ContainerProtos.ContainerCommandRequestProto smallFileRequest
|
||||||
|
= ContainerTestHelper.getWriteSmallFileRequest(
|
||||||
|
client.getPipeline(), containerName, keyName, 1024);
|
||||||
|
|
||||||
|
ContainerProtos.ContainerCommandResponseProto response
|
||||||
|
= client.sendCommand(smallFileRequest);
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
Assert.assertTrue(smallFileRequest.getTraceID()
|
||||||
|
.equals(response.getTraceID()));
|
||||||
|
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (client != null) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user