HDDS-1184. Parallelization of write chunks in datanodes is broken. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2019-03-06 10:00:16 +05:30
parent 945b504c25
commit 62e89dc275
2 changed files with 33 additions and 28 deletions

View File

@ -143,6 +143,7 @@ enum Result {
BCSID_MISMATCH = 38; BCSID_MISMATCH = 38;
CONTAINER_NOT_OPEN = 39; CONTAINER_NOT_OPEN = 39;
CONTAINER_MISSING = 40; CONTAINER_MISSING = 40;
BLOCK_TOKEN_VERIFICATION_FAILED = 41;
} }
/** /**

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import io.opentracing.Scope; import io.opentracing.Scope;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
@ -350,13 +352,20 @@ private ContainerCommandRequestProto getRequestProto(ByteString request)
} }
private ContainerCommandResponseProto dispatchCommand( private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, ContainerCommandRequestProto requestProto, DispatcherContext context) {
DispatcherContext context) throws IOException {
LOG.trace("dispatch {}", requestProto); LOG.trace("dispatch {}", requestProto);
if(isBlockTokenEnabled) { if (isBlockTokenEnabled) {
// ServerInterceptors intercepts incoming request and creates ugi. try {
tokenVerifier.verify(UserGroupInformation.getCurrentUser() // ServerInterceptors intercepts incoming request and creates ugi.
.getShortUserName(), requestProto.getEncodedToken()); tokenVerifier
.verify(UserGroupInformation.getCurrentUser().getShortUserName(),
requestProto.getEncodedToken());
} catch (IOException ioe) {
StorageContainerException sce = new StorageContainerException(
"Block token verification failed. " + ioe.getMessage(), ioe,
ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
return ContainerUtils.logAndReturnError(LOG, sce, requestProto);
}
} }
ContainerCommandResponseProto response = ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context); dispatcher.dispatch(requestProto, context);
@ -365,7 +374,7 @@ private ContainerCommandResponseProto dispatchCommand(
} }
private Message runCommand(ContainerCommandRequestProto requestProto, private Message runCommand(ContainerCommandRequestProto requestProto,
DispatcherContext context) throws IOException { DispatcherContext context) {
return dispatchCommand(requestProto, context)::toByteString; return dispatchCommand(requestProto, context)::toByteString;
} }
@ -394,14 +403,10 @@ private CompletableFuture<Message> handleWriteChunk(
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setCreateContainerSet(createContainerSet) .setCreateContainerSet(createContainerSet)
.build(); .build();
CompletableFuture<Message> writeChunkFuture; // ensure the write chunk happens asynchronously in writeChunkExecutor pool
try { // thread.
Message msg = runCommand(requestProto, context); CompletableFuture<Message> writeChunkFuture = CompletableFuture
writeChunkFuture = CompletableFuture .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
.supplyAsync(() -> msg, chunkExecutor);
}catch(IOException ie) {
writeChunkFuture = completeExceptionally(ie);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture); writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
@ -567,16 +572,18 @@ public CompletableFuture<ByteString> readStateMachineData(
// readStateMachineData should only be called for "write" to Ratis. // readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) { if (requestProto.getCmdType() == Type.WriteChunk) {
CompletableFuture<ByteString> future = new CompletableFuture<>(); final CompletableFuture<ByteString> future = new CompletableFuture<>();
return future.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
try { try {
return getCachedStateMachineData(entry.getIndex(), entry.getTerm(), future.complete(
requestProto); getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
requestProto));
} catch (ExecutionException e) { } catch (ExecutionException e) {
future.completeExceptionally(e); future.completeExceptionally(e);
return null;
} }
return future;
}, chunkExecutor); }, chunkExecutor);
return future;
} else { } else {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data"); + " cannot have state machine data");
@ -627,7 +634,6 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
ContainerCommandRequestProto requestProto = ContainerCommandRequestProto requestProto =
getRequestProto(trx.getStateMachineLogEntry().getLogData()); getRequestProto(trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType(); Type cmdType = requestProto.getCmdType();
CompletableFuture<Message> future;
// Make sure that in write chunk, the user data is not set // Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) { if (cmdType == Type.WriteChunk) {
Preconditions Preconditions
@ -638,13 +644,11 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) { if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
builder.setCreateContainerSet(createContainerSet); builder.setCreateContainerSet(createContainerSet);
} }
try { // Ensure the command gets executed in a separate thread than
Message msg = runCommand(requestProto, builder.build()); // stateMachineUpdater thread which is calling applyTransaction here.
future = CompletableFuture.supplyAsync(() -> msg, CompletableFuture<Message> future = CompletableFuture
getCommandExecutor(requestProto)); .supplyAsync(() -> runCommand(requestProto, builder.build()),
} catch (IOException ie) { getCommandExecutor(requestProto));
future = completeExceptionally(ie);
}
lastIndex = index; lastIndex = index;
future.thenAccept(m -> { future.thenAccept(m -> {