HDDS-850. ReadStateMachineData hits OverlappingFileLockException in ContainerStateMachine. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-11-29 22:20:08 +05:30
parent 7eb0d3a324
commit 5e102f9aa5
11 changed files with 143 additions and 71 deletions

View File

@ -93,6 +93,14 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE =
"dfs.container.ratis.log.queue.size";
public static final int DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT = 128;
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
"dfs.container.ratis.statemachine.cache.expiry.interval";
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
"10s";
public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
"dfs.ratis.client.request.timeout.duration";
public static final TimeDuration

View File

@ -249,6 +249,15 @@ public final class OzoneConfigKeys {
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT;
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL;
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT;
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir";
public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =

View File

@ -392,6 +392,7 @@ message WriteChunkResponseProto {
message ReadChunkRequestProto {
required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2;
optional bool readFromTmpFile = 3 [default = false];
}
message ReadChunkResponseProto {

View File

@ -189,6 +189,14 @@
used by Apache Ratis on datanodes.(128 MB by default)
</description>
</property>
<property>
<name>dfs.container.ratis.statemachine.cache.expiry.interval</name>
<value>10s</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The interval till which the stateMachine data in ratis
will be cached inside the ContainerStateMachine.
</description>
</property>
<property>
<name>dfs.ratis.client.request.timeout.duration</name>
<value>3s</value>

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@ -27,6 +29,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf
@ -60,12 +63,16 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
@ -116,12 +123,11 @@ public class ContainerStateMachine extends BaseStateMachine {
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
createContainerFutureMap;
private ExecutorService[] executors;
private final int numExecutors;
private final Map<Long, Long> applyTransactionCompletionMap;
private long lastIndex;
private final Cache<Long, ByteString> stateMachineDataCache;
/**
* CSM metrics.
*/
@ -129,7 +135,7 @@ public class ContainerStateMachine extends BaseStateMachine {
public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
List<ExecutorService> executors) {
List<ExecutorService> executors, long expiryInterval) {
this.gid = gid;
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
@ -138,9 +144,13 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
this.numExecutors = executors.size();
this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
stateMachineDataCache = CacheBuilder.newBuilder()
.expireAfterAccess(expiryInterval, TimeUnit.MILLISECONDS)
// set the limit on no of cached entries equal to no of max threads
// executing writeStateMachineData
.maximumSize(chunkExecutor.getCorePoolSize()).build();
}
@Override
@ -257,14 +267,6 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setStateMachineData(dataContainerCommandProto.toByteString())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else if (proto.getCmdType() == Type.CreateContainer) {
return TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineData(request.getMessage().getContent())
.setLogData(request.getMessage().getContent())
.build();
} else {
return TransactionContext.newBuilder()
.setClientRequest(request)
@ -310,17 +312,17 @@ private ExecutorService getCommandExecutor(
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
long containerID = write.getBlockID().getContainerID();
CompletableFuture<Message> future =
createContainerFutureMap.get(containerID);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), chunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), chunkExecutor);
RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy);
try {
if (((RaftServerProxy) server).getImpl(gid).isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
} catch (IOException ioe) {
return completeExceptionally(ioe);
}
CompletableFuture<Message> writeChunkFuture = CompletableFuture
.supplyAsync(() -> runCommand(requestProto), chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@ -337,14 +339,6 @@ private CompletableFuture<Message> handleWriteChunk(
return writeChunkFuture;
}
private CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) {
long containerID = requestProto.getContainerID();
createContainerFutureMap.
computeIfAbsent(containerID, k -> new CompletableFuture<>());
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
}
/*
* writeStateMachineData calls are not synchronized with each other
* and also with applyTransaction.
@ -356,9 +350,10 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
final ContainerCommandRequestProto requestProto =
getRequestProto(getStateMachineData(entry.getStateMachineLogEntry()));
Type cmdType = requestProto.getCmdType();
// For only writeChunk, there will be writeStateMachineData call.
// CreateContainer will happen as a part of writeChunk only.
switch (cmdType) {
case CreateContainer:
return handleCreateContainer(requestProto);
case WriteChunk:
return handleWriteChunk(requestProto, entry.getIndex());
default:
@ -397,7 +392,10 @@ private ByteString readStateMachineData(ContainerCommandRequestProto
ReadChunkRequestProto.Builder readChunkRequestProto =
ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunkRequestProto.getBlockID())
.setChunkData(writeChunkRequestProto.getChunkData());
.setChunkData(writeChunkRequestProto.getChunkData())
// set readFromTempFile to true in case, the chunkFile does
// not exist as applyTransaction is not executed for this entry yet.
.setReadFromTmpFile(true);
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk)
@ -409,15 +407,39 @@ private ByteString readStateMachineData(ContainerCommandRequestProto
dispatchCommand(dataContainerCommandProto);
ReadChunkResponseProto responseProto = response.getReadChunk();
ByteString data = responseProto.getData();
// assert that the response has data in it.
Preconditions.checkNotNull(responseProto.getData());
Preconditions.checkNotNull(data);
return data;
}
/**
* Reads the Entry from the Cache or loads it back by reading from disk.
*/
private ByteString getCachedStateMachineData(Long logIndex,
ContainerCommandRequestProto requestProto) throws ExecutionException {
try {
return reconstructWriteChunkRequest(
stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
@Override
public ByteString call() throws Exception {
return readStateMachineData(requestProto);
}
}), requestProto);
} catch (ExecutionException e) {
throw e;
}
}
private ByteString reconstructWriteChunkRequest(ByteString data,
ContainerCommandRequestProto requestProto) {
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
// reconstruct the write chunk request
final WriteChunkRequestProto.Builder dataWriteChunkProto =
WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
// adding the state machine data
.setData(responseProto.getData())
.setStage(Stage.WRITE_DATA);
.setData(data).setStage(Stage.WRITE_DATA);
ContainerCommandRequestProto.Builder newStateMachineProto =
ContainerCommandRequestProto.newBuilder(requestProto)
@ -455,18 +477,21 @@ public CompletableFuture<ByteString> readStateMachineData(
if (!getStateMachineData(smLogEntryProto).isEmpty()) {
return CompletableFuture.completedFuture(ByteString.EMPTY);
}
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getStateMachineLogEntry().getLogData());
// readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) {
return CompletableFuture.supplyAsync(() ->
readStateMachineData(requestProto), chunkExecutor);
} else if (requestProto.getCmdType() == Type.CreateContainer) {
return CompletableFuture.completedFuture(requestProto.toByteString());
CompletableFuture<ByteString> future = new CompletableFuture<>();
return future.supplyAsync(() -> {
try {
return getCachedStateMachineData(entry.getIndex(), requestProto);
} catch (ExecutionException e) {
future.completeExceptionally(e);
return null;
}
}, chunkExecutor);
} else {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
@ -559,19 +584,6 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
getCommandExecutor(requestProto));
}
// Mark the createContainerFuture complete so that writeStateMachineData
// for WriteChunk gets unblocked
if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
future.thenApply(
r -> {
createContainerFutureMap.remove(containerID).complete(null);
LOG.info("create Container Transaction completed for container " +
containerID + " log index " + index);
return r;
});
}
lastIndex = index;
future.thenAccept(m -> {
final Long previous =
@ -593,6 +605,11 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
return future;
}
private void evictStateMachineCache() {
stateMachineDataCache.invalidateAll();
stateMachineDataCache.cleanUp();
}
@Override
public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(group, roleInfoProto);
@ -604,7 +621,14 @@ public void notifyExtendedNoLeader(RaftGroup group,
ratisServer.handleNoLeader(group, roleInfoProto);
}
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
evictStateMachineCache();
}
@Override
public void close() throws IOException {
evictStateMachineCache();
}
}

View File

@ -105,6 +105,8 @@ private static long nextCallId() {
private final StateContext context;
private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
private final long cacheEntryExpiryInteval;
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@ -128,6 +130,11 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
this.executors = new ArrayList<>();
cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL,
OzoneConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
this.dispatcher = dispatcher;
for (int i = 0; i < numContainerOpExecutors; i++) {
executors.add(Executors.newSingleThreadExecutor());
@ -141,8 +148,8 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
}
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, chunkExecutor,
this, Collections.unmodifiableList(executors));
return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this,
Collections.unmodifiableList(executors), cacheEntryExpiryInteval);
}
private RaftProperties newRaftProperties(Configuration conf) {
@ -304,6 +311,9 @@ private RaftProperties newRaftProperties(Configuration conf) {
RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties,
numSyncRetries);
// Enable the StateMachineCaching
RaftServerConfigKeys.Log.StateMachineData
.setCachingEnabled(properties, true);
return properties;
}

View File

@ -531,7 +531,8 @@ ContainerCommandResponseProto handleReadChunk(
.getChunkData());
Preconditions.checkNotNull(chunkInfo);
data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
data = chunkManager.readChunk(kvContainer, blockID, chunkInfo,
request.getReadChunk().getReadFromTmpFile());
metrics.incContainerBytesStats(Type.ReadChunk, data.length);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -702,8 +703,10 @@ ContainerCommandResponseProto handleGetSmallFile(
ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY;
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
// if the block is committed, all chunks must have been committed.
// Tmp chunk files won't exist here.
byte[] data = chunkManager.readChunk(kvContainer, blockID,
ChunkInfo.getFromProtoBuf(chunk));
ChunkInfo.getFromProtoBuf(chunk), false);
ByteString current = ByteString.copyFrom(data);
dataBuf = dataBuf.concat(current);
chunkInfo = chunk;

View File

@ -173,13 +173,14 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @param readFromTmpFile whether to read from tmp chunk file or not.
* @return byte array
* @throws StorageContainerException
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
throws StorageContainerException {
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
boolean readFromTmpFile) throws StorageContainerException {
try {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
@ -194,6 +195,12 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
// In case the chunk file does not exist but tmp chunk file exist,
// read from tmp chunk file if readFromTmpFile is set to true
if (!chunkFile.exists() && readFromTmpFile) {
chunkFile = getTmpChunkFile(chunkFile, info);
}
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
containerData.incrReadCount();
long length = chunkFile.length();

View File

@ -51,14 +51,15 @@ void writeChunk(Container container, BlockID blockID, ChunkInfo info,
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @param readFromTmpFile whether to read from tmp chunk file or not
* @return byte array
* @throws StorageContainerException
*
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws
StorageContainerException;
byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
boolean readFromTmpFile) throws StorageContainerException;
/**
* Deletes a given chunk.

View File

@ -189,7 +189,7 @@ public void testReadChunk() throws Exception {
checkWriteIOStats(data.length, 1);
checkReadIOStats(0, 0);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo);
chunkInfo, false);
assertEquals(expectedData.length, data.length);
assertTrue(Arrays.equals(expectedData, data));
checkReadIOStats(data.length, 1);
@ -226,7 +226,7 @@ public void testReadChunkFileNotExists() throws Exception {
try {
// trying to read a chunk, where chunk file does not exist
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo);
chunkInfo, false);
fail("testReadChunkFileNotExists failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
@ -249,7 +249,7 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception {
for (int i=0; i<100; i++) {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length);
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo);
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, false);
}
checkReadIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);

View File

@ -406,7 +406,7 @@ public void testWritReadManyChunks() throws IOException {
for (int x = 0; x < chunkCount; x++) {
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
ChunkInfo info = fileHashMap.get(fileName);
byte[] data = chunkManager.readChunk(container, blockID, info);
byte[] data = chunkManager.readChunk(container, blockID, info, false);
ChecksumData checksumData = checksum.computeChecksum(data);
Assert.assertEquals(info.getChecksumData(), checksumData);
}
@ -435,11 +435,11 @@ public void testPartialRead() throws Exception {
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
byte[] readData = chunkManager.readChunk(container, blockID, info);
byte[] readData = chunkManager.readChunk(container, blockID, info, false);
assertTrue(Arrays.equals(data, readData));
ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
byte[] readData2 = chunkManager.readChunk(container, blockID, info2);
byte[] readData2 = chunkManager.readChunk(container, blockID, info2, false);
assertEquals(length, readData2.length);
assertTrue(Arrays.equals(
Arrays.copyOfRange(data, start, start + length), readData2));
@ -513,7 +513,8 @@ public void testMultipleWriteSingleRead() throws IOException,
// Request to read the whole data in a single go.
ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
datalen * chunkCount);
byte[] newdata = chunkManager.readChunk(container, blockID, largeChunk);
byte[] newdata =
chunkManager.readChunk(container, blockID, largeChunk, false);
MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
newSha.update(newdata);
Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
@ -543,7 +544,7 @@ public void testDeleteChunk() throws IOException,
chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file.");
chunkManager.readChunk(container, blockID, info);
chunkManager.readChunk(container, blockID, info, false);
}
/**