HDFS-11835. Block Storage: Overwrite of blocks fails. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
ca70300eea
commit
4fb9064523
@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
|||||||
.ReadContainerResponseProto;
|
.ReadContainerResponseProto;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.ReadContainerRequestProto;
|
.ReadContainerRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -199,11 +200,14 @@ public final class ContainerProtocolCalls {
|
|||||||
.setPipeline(client.getPipeline().getProtobufMessage())
|
.setPipeline(client.getPipeline().getProtobufMessage())
|
||||||
.setKeyData(containerKeyData);
|
.setKeyData(containerKeyData);
|
||||||
|
|
||||||
|
KeyValue keyValue = KeyValue.newBuilder()
|
||||||
|
.setKey("OverWriteRequested").setValue("true").build();
|
||||||
ChunkInfo chunk = ChunkInfo
|
ChunkInfo chunk = ChunkInfo
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setChunkName(key + "_chunk")
|
.setChunkName(key + "_chunk")
|
||||||
.setOffset(0)
|
.setOffset(0)
|
||||||
.setLen(data.length)
|
.setLen(data.length)
|
||||||
|
.addMetadata(keyValue)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto
|
PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto
|
||||||
|
@ -391,4 +391,59 @@ public class TestBufferManager {
|
|||||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||||
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
|
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepeatedBlockWrites() throws IOException,
|
||||||
|
InterruptedException, TimeoutException{
|
||||||
|
// Create a new config so that this tests write metafile to new location
|
||||||
|
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||||
|
URL p = flushTestConfig.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||||
|
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||||
|
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||||
|
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||||
|
String data = RandomStringUtils.random(4 * KB);
|
||||||
|
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||||
|
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||||
|
xceiverClientManager, metrics);
|
||||||
|
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||||
|
.setConfiguration(flushTestConfig)
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setUserName(userName)
|
||||||
|
.setPipelines(createContainerAndGetPipeline(10))
|
||||||
|
.setClientManager(xceiverClientManager)
|
||||||
|
.setBlockSize(4 * KB)
|
||||||
|
.setVolumeSize(50 * GB)
|
||||||
|
.setFlusher(flusher)
|
||||||
|
.setCBlockTargetMetrics(metrics)
|
||||||
|
.build();
|
||||||
|
Thread fllushListenerThread = new Thread(flusher);
|
||||||
|
fllushListenerThread.setDaemon(true);
|
||||||
|
fllushListenerThread.start();
|
||||||
|
cache.start();
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
Assert.assertEquals(512, metrics.getNumWriteOps());
|
||||||
|
Assert.assertEquals(512, metrics.getNumBlockBufferUpdates());
|
||||||
|
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
|
||||||
|
Thread.sleep(5000);
|
||||||
|
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < 512; i++) {
|
||||||
|
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1024, metrics.getNumWriteOps());
|
||||||
|
Assert.assertEquals(1024, metrics.getNumBlockBufferUpdates());
|
||||||
|
Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
Assert.assertEquals(0, metrics.getNumWriteIOExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(0, metrics.getNumWriteGenericExceptionRetryBlocks());
|
||||||
|
Assert.assertEquals(2, metrics.getNumBlockBufferFlushCompleted());
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user