HDDS-1766. ContainerStateMachine is unable to increment lastAppliedTermIndex. Contributed by Mukul Kumar Singh. (#1072)

This commit is contained in:
Mukul Kumar Singh 2019-07-14 10:53:51 +05:30 committed by GitHub
parent 4a70a0d816
commit 0976f6fc30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -197,17 +197,16 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
if (snapshot == null) {
TermIndex empty =
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
LOG.info(
"The snapshot info is null." + "Setting the last applied index to:"
+ empty);
LOG.info("{}: The snapshot info is null. Setting the last applied index" +
"to:{}", gid, empty);
setLastAppliedTermIndex(empty);
return RaftLog.INVALID_LOG_INDEX;
return empty.getIndex();
}
final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
LOG.info("Setting the last applied index to " + last);
LOG.info("{}: Setting the last applied index to {}", gid, last);
setLastAppliedTermIndex(last);
// initialize the dispatcher with snapshot so that it build the missing
@ -243,18 +242,20 @@ public void persistContainerSet(OutputStream out) throws IOException {
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
long startTime = Time.monotonicNow();
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + ti);
LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
@ -337,7 +338,7 @@ private ContainerCommandRequestProto getContainerCommandRequestProto(
private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
if (isBlockTokenEnabled) {
@ -355,7 +356,7 @@ private ContainerCommandResponseProto dispatchCommand(
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
LOG.trace("response {}", response);
LOG.trace("{}: response {}", gid, response);
return response;
}
@ -395,18 +396,18 @@ private CompletableFuture<Message> handleWriteChunk(
.supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData()
.getChunkName());
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName());
// Remove the future once it finishes execution from the
// writeChunkFutureMap.
writeChunkFuture.thenApply(r -> {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
writeChunkFutureMap.remove(entryIndex);
LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
.getBlockID() + " logIndex " + entryIndex + " chunkName " + write
.getChunkData().getChunkName());
LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName());
return r;
});
return writeChunkFuture;
@ -564,12 +565,12 @@ public CompletableFuture<ByteString> readStateMachineData(
}
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
LOG.error("unable to read stateMachineData:" + e);
LOG.error("{} unable to read stateMachineData:", gid, e);
return completeExceptionally(e);
}
}
private void updateLastApplied() {
private synchronized void updateLastApplied() {
Long appliedTerm = null;
long appliedIndex = -1;
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {