HDDS-890. Handle OverlappingFileLockException during writeStateMachineData in ContainerStateMachine. Contributed by Shashikant Banerjee.

This commit is contained in:
Mukul Kumar Singh 2018-12-04 20:12:35 +05:30
parent ff31313d83
commit 7274115d57
5 changed files with 93 additions and 62 deletions

View File

@ -545,10 +545,12 @@ ContainerCommandResponseProto handleReadChunk(
.getChunkData()); .getChunkData());
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
boolean isReadFromTmpFile = dispatcherContext == null ? false : if (dispatcherContext == null) {
dispatcherContext.isReadFromTmpFile(); dispatcherContext = new DispatcherContext.Builder().build();
}
data = chunkManager data = chunkManager
.readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile); .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
metrics.incContainerBytesStats(Type.ReadChunk, data.length); metrics.incContainerBytesStats(Type.ReadChunk, data.length);
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request); return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -619,15 +621,17 @@ ContainerCommandResponseProto handleWriteChunk(
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
ByteBuffer data = null; ByteBuffer data = null;
WriteChunkStage stage = if (dispatcherContext == null) {
dispatcherContext == null ? WriteChunkStage.COMBINED : dispatcherContext = new DispatcherContext.Builder().build();
dispatcherContext.getStage(); }
WriteChunkStage stage = dispatcherContext.getStage();
if (stage == WriteChunkStage.WRITE_DATA || if (stage == WriteChunkStage.WRITE_DATA ||
stage == WriteChunkStage.COMBINED) { stage == WriteChunkStage.COMBINED) {
data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
} }
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext);
// We should increment stats after writeChunk // We should increment stats after writeChunk
if (stage == WriteChunkStage.WRITE_DATA|| if (stage == WriteChunkStage.WRITE_DATA||
@ -677,19 +681,19 @@ ContainerCommandResponseProto handlePutSmallFile(
putSmallFileReq.getChunkInfo()); putSmallFileReq.getChunkInfo());
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
WriteChunkStage stage = if (dispatcherContext == null) {
dispatcherContext == null ? WriteChunkStage.COMBINED : dispatcherContext = new DispatcherContext.Builder().build();
dispatcherContext.getStage(); }
// chunks will be committed as a part of handling putSmallFile // chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap. // here. There is no need to maintain this info in openContainerBlockMap.
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage()); chunks.add(chunkInfo.getProtoBufMessage());
blockData.setChunks(chunks); blockData.setChunks(chunks);
long bcsId = blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
blockData.setBlockCommitSequenceId(bcsId);
blockManager.putBlock(kvContainer, blockData); blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
@ -728,11 +732,13 @@ ContainerCommandResponseProto handleGetSmallFile(
ContainerProtos.ChunkInfo chunkInfo = null; ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY; ByteString dataBuf = ByteString.EMPTY;
DispatcherContext dispatcherContext =
new DispatcherContext.Builder().build();
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
// if the block is committed, all chunks must have been committed. // if the block is committed, all chunks must have been committed.
// Tmp chunk files won't exist here. // Tmp chunk files won't exist here.
byte[] data = chunkManager.readChunk(kvContainer, blockID, byte[] data = chunkManager.readChunk(kvContainer, blockID,
ChunkInfo.getFromProtoBuf(chunk), false); ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
ByteString current = ByteString.copyFrom(data); ByteString current = ByteString.copyFrom(data);
dataBuf = dataBuf.concat(current); dataBuf = dataBuf.concat(current);
chunkInfo = chunk; chunkInfo = chunk;

View File

@ -67,13 +67,14 @@ public ChunkManagerImpl(boolean sync) {
* @param blockID - ID of the block * @param blockID - ID of the block
* @param info - ChunkInfo * @param info - ChunkInfo
* @param data - data of the chunk * @param data - data of the chunk
* @param stage - Stage of the Chunk operation * @param dispatcherContext - dispatcherContextInfo
* @throws StorageContainerException * @throws StorageContainerException
*/ */
public void writeChunk(Container container, BlockID blockID, ChunkInfo info, public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, DispatcherContext.WriteChunkStage stage) ByteBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException { throws StorageContainerException {
Preconditions.checkNotNull(dispatcherContext);
DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();
try { try {
KeyValueContainerData containerData = (KeyValueContainerData) container KeyValueContainerData containerData = (KeyValueContainerData) container
@ -85,7 +86,7 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
boolean isOverwrite = ChunkUtils.validateChunkForOverwrite( boolean isOverwrite = ChunkUtils.validateChunkForOverwrite(
chunkFile, info); chunkFile, info);
File tmpChunkFile = getTmpChunkFile(chunkFile, info); File tmpChunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
LOG.debug( LOG.debug(
"writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file:{}", "writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file:{}",
@ -137,6 +138,8 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
LOG.warn("ChunkFile already exists" + chunkFile); LOG.warn("ChunkFile already exists" + chunkFile);
return; return;
} }
// While committing a chunk , just rename the tmp chunk file which has
// the same term and log index appended as the current transaction
commitChunk(tmpChunkFile, chunkFile); commitChunk(tmpChunkFile, chunkFile);
// Increment container stats here, as we commit the data. // Increment container stats here, as we commit the data.
containerData.incrBytesUsed(info.getLen()); containerData.incrBytesUsed(info.getLen());
@ -179,14 +182,14 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
* @param container - Container for the chunk * @param container - Container for the chunk
* @param blockID - ID of the block. * @param blockID - ID of the block.
* @param info - ChunkInfo. * @param info - ChunkInfo.
* @param readFromTmpFile whether to read from tmp chunk file or not. * @param dispatcherContext dispatcher context info.
* @return byte array * @return byte array
* @throws StorageContainerException * @throws StorageContainerException
* TODO: Right now we do not support partial reads and writes of chunks. * TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone. * TODO: Explore if we need to do that for ozone.
*/ */
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
boolean readFromTmpFile) throws StorageContainerException { DispatcherContext dispatcherContext) throws StorageContainerException {
try { try {
KeyValueContainerData containerData = (KeyValueContainerData) container KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData(); .getContainerData();
@ -204,8 +207,8 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
// In case the chunk file does not exist but tmp chunk file exist, // In case the chunk file does not exist but tmp chunk file exist,
// read from tmp chunk file if readFromTmpFile is set to true // read from tmp chunk file if readFromTmpFile is set to true
if (!chunkFile.exists() && readFromTmpFile) { if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) {
chunkFile = getTmpChunkFile(chunkFile, info); chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
} }
data = ChunkUtils.readData(chunkFile, info, volumeIOStats); data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
containerData.incrReadCount(); containerData.incrReadCount();
@ -279,17 +282,21 @@ public void shutdown() {
/** /**
* Returns the temporary chunkFile path. * Returns the temporary chunkFile path.
* @param chunkFile * @param chunkFile chunkFileName
* @param info * @param dispatcherContext dispatcher context info
* @return temporary chunkFile path * @return temporary chunkFile path
* @throws StorageContainerException * @throws StorageContainerException
*/ */
private File getTmpChunkFile(File chunkFile, ChunkInfo info) private File getTmpChunkFile(File chunkFile,
throws StorageContainerException { DispatcherContext dispatcherContext) {
return new File(chunkFile.getParent(), return new File(chunkFile.getParent(),
chunkFile.getName() + chunkFile.getName() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
dispatcherContext.getTerm() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
dispatcherContext.getLogIndex());
} }
/** /**

View File

@ -39,11 +39,11 @@ public interface ChunkManager {
* @param container - Container for the chunk * @param container - Container for the chunk
* @param blockID - ID of the block. * @param blockID - ID of the block.
* @param info - ChunkInfo. * @param info - ChunkInfo.
* @param stage - Chunk Stage write. * @param dispatcherContext - dispatcher context info.
* @throws StorageContainerException * @throws StorageContainerException
*/ */
void writeChunk(Container container, BlockID blockID, ChunkInfo info, void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, DispatcherContext.WriteChunkStage stage) ByteBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException; throws StorageContainerException;
/** /**
@ -52,7 +52,7 @@ void writeChunk(Container container, BlockID blockID, ChunkInfo info,
* @param container - Container for the chunk * @param container - Container for the chunk
* @param blockID - ID of the block. * @param blockID - ID of the block.
* @param info - ChunkInfo. * @param info - ChunkInfo.
* @param readFromTmpFile whether to read from tmp chunk file or not * @param dispatcherContext - dispatcher context info.
* @return byte array * @return byte array
* @throws StorageContainerException * @throws StorageContainerException
* *
@ -60,7 +60,7 @@ void writeChunk(Container container, BlockID blockID, ChunkInfo info,
* TODO: Explore if we need to do that for ozone. * TODO: Explore if we need to do that for ozone.
*/ */
byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
boolean readFromTmpFile) throws StorageContainerException; DispatcherContext dispatcherContext) throws StorageContainerException;
/** /**
* Deletes a given chunk. * Deletes a given chunk.

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@ -103,6 +103,10 @@ public void setUp() throws Exception {
} }
private DispatcherContext getDispatcherContext() {
return new DispatcherContext.Builder().build();
}
@Test @Test
public void testWriteChunkStageWriteAndCommit() throws Exception { public void testWriteChunkStageWriteAndCommit() throws Exception {
//As in Setup, we try to create container, these paths should exist. //As in Setup, we try to create container, these paths should exist.
@ -115,16 +119,20 @@ public void testWriteChunkStageWriteAndCommit() throws Exception {
// As no chunks are written to the volume writeBytes should be 0 // As no chunks are written to the volume writeBytes should be 0
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); ByteBuffer.wrap(data), new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
// Now a chunk file is being written with Stage WRITE_DATA, so it should // Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file. // create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
long term = 0;
long index = 0;
File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo); File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
File tempChunkFile = new File(chunkFile.getParent(), File tempChunkFile = new File(chunkFile.getParent(),
chunkFile.getName() + chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + term
+ OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + index);
// As chunk write stage is WRITE_DATA, temp chunk file will be created. // As chunk write stage is WRITE_DATA, temp chunk file will be created.
assertTrue(tempChunkFile.exists()); assertTrue(tempChunkFile.exists());
@ -132,7 +140,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception {
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA); ByteBuffer.wrap(data), new DispatcherContext.Builder()
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
@ -152,7 +161,7 @@ public void testWriteChunkIncorrectLength() throws Exception {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength); .getLocalID(), 0), 0, randomLength);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); ByteBuffer.wrap(data), getDispatcherContext());
fail("testWriteChunkIncorrectLength failed"); fail("testWriteChunkIncorrectLength failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
// As we got an exception, writeBytes should be 0. // As we got an exception, writeBytes should be 0.
@ -173,7 +182,7 @@ public void testWriteChunkStageCombinedData() throws Exception {
assertTrue(chunksPath.listFiles().length == 0); assertTrue(chunksPath.listFiles().length == 0);
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMBINED); ByteBuffer.wrap(data), getDispatcherContext());
// Now a chunk file is being written with Stage COMBINED_DATA, so it should // Now a chunk file is being written with Stage COMBINED_DATA, so it should
// create a chunk file. // create a chunk file.
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
@ -186,11 +195,11 @@ public void testWriteChunkStageCombinedData() throws Exception {
public void testReadChunk() throws Exception { public void testReadChunk() throws Exception {
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMBINED); ByteBuffer.wrap(data), getDispatcherContext());
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
checkReadIOStats(0, 0); checkReadIOStats(0, 0);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo, false); chunkInfo, getDispatcherContext());
assertEquals(expectedData.length, data.length); assertEquals(expectedData.length, data.length);
assertTrue(Arrays.equals(expectedData, data)); assertTrue(Arrays.equals(expectedData, data));
checkReadIOStats(data.length, 1); checkReadIOStats(data.length, 1);
@ -200,7 +209,7 @@ public void testReadChunk() throws Exception {
public void testDeleteChunk() throws Exception { public void testDeleteChunk() throws Exception {
File chunksPath = new File(keyValueContainerData.getChunksPath()); File chunksPath = new File(keyValueContainerData.getChunksPath());
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMBINED); ByteBuffer.wrap(data), getDispatcherContext());
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
assertTrue(chunksPath.listFiles().length == 0); assertTrue(chunksPath.listFiles().length == 0);
@ -210,7 +219,7 @@ public void testDeleteChunk() throws Exception {
public void testDeleteChunkUnsupportedRequest() throws Exception { public void testDeleteChunkUnsupportedRequest() throws Exception {
try { try {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMBINED); ByteBuffer.wrap(data), getDispatcherContext());
long randomLength = 200L; long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength); .getLocalID(), 0), 0, randomLength);
@ -227,7 +236,7 @@ public void testReadChunkFileNotExists() throws Exception {
try { try {
// trying to read a chunk, where chunk file does not exist // trying to read a chunk, where chunk file does not exist
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo, false); chunkInfo, getDispatcherContext());
fail("testReadChunkFileNotExists failed"); fail("testReadChunkFileNotExists failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the chunk " + GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
@ -242,7 +251,7 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length); .getLocalID(), i), 0, data.length);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), WriteChunkStage.COMBINED); ByteBuffer.wrap(data), getDispatcherContext());
} }
checkWriteIOStats(data.length*100, 100); checkWriteIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
@ -250,7 +259,8 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception {
for (int i=0; i<100; i++) { for (int i=0; i<100; i++) {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length); .getLocalID(), i), 0, data.length);
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, false); chunkManager.readChunk(keyValueContainer, blockID, chunkInfo,
getDispatcherContext());
} }
checkReadIOStats(data.length*100, 100); checkReadIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0); assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
@ -78,7 +79,6 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
@ -156,6 +156,10 @@ private long getTestContainerID() {
return ContainerTestHelper.getTestContainerID(); return ContainerTestHelper.getTestContainerID();
} }
private DispatcherContext getDispatcherContext() {
return new DispatcherContext.Builder().build();
}
private Container addContainer(ContainerSet cSet, long cID) private Container addContainer(ContainerSet cSet, long cID)
throws IOException { throws IOException {
KeyValueContainerData data = new KeyValueContainerData(cID, KeyValueContainerData data = new KeyValueContainerData(cID,
@ -334,7 +338,7 @@ private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
return info; return info;
} }
@ -375,7 +379,7 @@ public void testWritReadManyChunks() throws IOException {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
fileHashMap.put(fileName, info); fileHashMap.put(fileName, info);
} }
@ -406,7 +410,8 @@ public void testWritReadManyChunks() throws IOException {
for (int x = 0; x < chunkCount; x++) { for (int x = 0; x < chunkCount; x++) {
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
ChunkInfo info = fileHashMap.get(fileName); ChunkInfo info = fileHashMap.get(fileName);
byte[] data = chunkManager.readChunk(container, blockID, info, false); byte[] data = chunkManager
.readChunk(container, blockID, info, getDispatcherContext());
ChecksumData checksumData = checksum.computeChecksum(data); ChecksumData checksumData = checksum.computeChecksum(data);
Assert.assertEquals(info.getChecksumData(), checksumData); Assert.assertEquals(info.getChecksumData(), checksumData);
} }
@ -433,13 +438,15 @@ public void testPartialRead() throws Exception {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
byte[] readData = chunkManager.readChunk(container, blockID, info, false); byte[] readData = chunkManager
.readChunk(container, blockID, info, getDispatcherContext());
assertTrue(Arrays.equals(data, readData)); assertTrue(Arrays.equals(data, readData));
ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length); ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
byte[] readData2 = chunkManager.readChunk(container, blockID, info2, false); byte[] readData2 = chunkManager
.readChunk(container, blockID, info2, getDispatcherContext());
assertEquals(length, readData2.length); assertEquals(length, readData2.length);
assertTrue(Arrays.equals( assertTrue(Arrays.equals(
Arrays.copyOfRange(data, start, start + length), readData2)); Arrays.copyOfRange(data, start, start + length), readData2));
@ -466,13 +473,13 @@ public void testOverWrite() throws IOException,
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
// With the overwrite flag it should work now. // With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
long bytesUsed = container.getContainerData().getBytesUsed(); long bytesUsed = container.getContainerData().getBytesUsed();
Assert.assertEquals(datalen, bytesUsed); Assert.assertEquals(datalen, bytesUsed);
@ -507,14 +514,15 @@ public void testMultipleWriteSingleRead() throws IOException,
oldSha.update(data); oldSha.update(data);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
} }
// Request to read the whole data in a single go. // Request to read the whole data in a single go.
ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0, ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
datalen * chunkCount); datalen * chunkCount);
byte[] newdata = byte[] newdata =
chunkManager.readChunk(container, blockID, largeChunk, false); chunkManager.readChunk(container, blockID, largeChunk,
getDispatcherContext());
MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
newSha.update(newdata); newSha.update(newdata);
Assert.assertEquals(Hex.encodeHexString(oldSha.digest()), Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
@ -540,11 +548,11 @@ public void testDeleteChunk() throws IOException,
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
chunkManager.deleteChunk(container, blockID, info); chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file."); exception.expectMessage("Unable to find the chunk file.");
chunkManager.readChunk(container, blockID, info, false); chunkManager.readChunk(container, blockID, info, getDispatcherContext());
} }
/** /**
@ -655,7 +663,7 @@ public void testPutBlockWithLotsOfChunks() throws IOException,
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
WriteChunkStage.COMBINED); getDispatcherContext());
totalSize += datalen; totalSize += datalen;
chunkList.add(info); chunkList.add(info);
} }