HDDS-1094. Performance test infrastructure : skip writing user data on Datanode. Contributed by Supratim Deka (#1323)
This commit is contained in:
parent
dc72782008
commit
1407414a52
@ -238,6 +238,12 @@ private HddsConfigKeys() {
|
||||
public static final String HDDS_SECURITY_CLIENT_SCM_CERTIFICATE_PROTOCOL_ACL =
|
||||
"hdds.security.client.scm.certificate.protocol.acl";
|
||||
|
||||
// Determines if the Container Chunk Manager will write user data to disk
|
||||
// Set to false only for specific performance tests
|
||||
public static final String HDDS_CONTAINER_PERSISTDATA =
|
||||
"hdds.container.chunk.persistdata";
|
||||
public static final boolean HDDS_CONTAINER_PERSISTDATA_DEFAULT = true;
|
||||
|
||||
public static final String HDDS_DATANODE_HTTP_ENABLED_KEY =
|
||||
"hdds.datanode.http.enabled";
|
||||
public static final String HDDS_DATANODE_HTTP_BIND_HOST_KEY =
|
||||
|
@ -71,7 +71,7 @@
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
|
||||
@ -114,7 +114,7 @@ public KeyValueHandler(Configuration config, StateContext context,
|
||||
doSyncWrite =
|
||||
conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
|
||||
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
|
||||
chunkManager = new ChunkManagerImpl(doSyncWrite);
|
||||
chunkManager = ChunkManagerFactory.getChunkManager(config, doSyncWrite);
|
||||
volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
|
||||
HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
|
||||
.class, VolumeChoosingPolicy.class), conf);
|
||||
|
@ -0,0 +1,162 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
|
||||
|
||||
/**
|
||||
* Implementation of ChunkManager built for running performance tests.
|
||||
* Chunks are not written to disk, Reads are returned with zero-filled buffers
|
||||
*/
|
||||
public class ChunkManagerDummyImpl extends ChunkManagerImpl {
|
||||
static final Logger LOG = LoggerFactory.getLogger(
|
||||
ChunkManagerDummyImpl.class);
|
||||
|
||||
public ChunkManagerDummyImpl(boolean sync) {
|
||||
super(sync);
|
||||
}
|
||||
|
||||
/**
|
||||
* writes a given chunk.
|
||||
*
|
||||
* @param container - Container for the chunk
|
||||
* @param blockID - ID of the block
|
||||
* @param info - ChunkInfo
|
||||
* @param data - data of the chunk
|
||||
* @param dispatcherContext - dispatcherContextInfo
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
@Override
|
||||
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||
ByteBuffer data, DispatcherContext dispatcherContext)
|
||||
throws StorageContainerException {
|
||||
long writeTimeStart = Time.monotonicNow();
|
||||
|
||||
Preconditions.checkNotNull(dispatcherContext);
|
||||
DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();
|
||||
|
||||
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||
|
||||
try {
|
||||
KeyValueContainerData containerData =
|
||||
(KeyValueContainerData) container.getContainerData();
|
||||
HddsVolume volume = containerData.getVolume();
|
||||
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
||||
int bufferSize;
|
||||
|
||||
switch (stage) {
|
||||
case WRITE_DATA:
|
||||
bufferSize = data.capacity();
|
||||
if (bufferSize != info.getLen()) {
|
||||
String err = String.format("data array does not match the length "
|
||||
+ "specified. DataLen: %d Byte Array: %d",
|
||||
info.getLen(), bufferSize);
|
||||
log.error(err);
|
||||
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
|
||||
}
|
||||
|
||||
// Increment volumeIO stats here.
|
||||
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
|
||||
volumeIOStats.incWriteOpCount();
|
||||
volumeIOStats.incWriteBytes(info.getLen());
|
||||
break;
|
||||
case COMMIT_DATA:
|
||||
updateContainerWriteStats(container, info, false);
|
||||
break;
|
||||
case COMBINED:
|
||||
updateContainerWriteStats(container, info, false);
|
||||
break;
|
||||
default:
|
||||
throw new IOException("Can not identify write operation.");
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("write data failed. error: {}", ex);
|
||||
throw new StorageContainerException("Internal error: ", ex,
|
||||
CONTAINER_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* return a zero-filled buffer.
|
||||
*
|
||||
* @param container - Container for the chunk
|
||||
* @param blockID - ID of the block.
|
||||
* @param info - ChunkInfo.
|
||||
* @param dispatcherContext dispatcher context info.
|
||||
* @return byte array
|
||||
* TODO: Right now we do not support partial reads and writes of chunks.
|
||||
* TODO: Explore if we need to do that for ozone.
|
||||
*/
|
||||
@Override
|
||||
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||
DispatcherContext dispatcherContext) {
|
||||
|
||||
long readStartTime = Time.monotonicNow();
|
||||
|
||||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||
.getContainerData();
|
||||
ByteBuffer data;
|
||||
HddsVolume volume = containerData.getVolume();
|
||||
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
||||
|
||||
data = ByteBuffer.allocate((int) info.getLen());
|
||||
|
||||
// Increment volumeIO stats here.
|
||||
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
|
||||
volumeIOStats.incReadOpCount();
|
||||
volumeIOStats.incReadBytes(info.getLen());
|
||||
|
||||
return data.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a given chunk - Do nothing except stats.
|
||||
*
|
||||
* @param container - Container for the chunk
|
||||
* @param blockID - ID of the block
|
||||
* @param info - Chunk Info
|
||||
*/
|
||||
@Override
|
||||
public void deleteChunk(Container container, BlockID blockID,
|
||||
ChunkInfo info) {
|
||||
Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
|
||||
KeyValueContainerData containerData =
|
||||
(KeyValueContainerData) container.getContainerData();
|
||||
|
||||
if (info.getOffset() == 0) {
|
||||
containerData.decrBytesUsed(info.getLen());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT;
|
||||
|
||||
/**
|
||||
* Select an appropriate ChunkManager implementation as per config setting.
|
||||
* Ozone ChunkManager is a Singleton
|
||||
*/
|
||||
public final class ChunkManagerFactory {
|
||||
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerFactory.class);
|
||||
|
||||
private static volatile ChunkManager instance = null;
|
||||
private static boolean syncChunks = false;
|
||||
|
||||
private ChunkManagerFactory() {
|
||||
}
|
||||
|
||||
public static ChunkManager getChunkManager(Configuration config,
|
||||
boolean sync) {
|
||||
if (instance == null) {
|
||||
synchronized (ChunkManagerFactory.class) {
|
||||
if (instance == null) {
|
||||
instance = createChunkManager(config, sync);
|
||||
syncChunks = sync;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Preconditions.checkArgument((syncChunks == sync),
|
||||
"value of sync conflicts with previous invocation");
|
||||
return instance;
|
||||
}
|
||||
|
||||
private static ChunkManager createChunkManager(Configuration config,
|
||||
boolean sync) {
|
||||
ChunkManager manager = null;
|
||||
boolean persist = config.getBoolean(HDDS_CONTAINER_PERSISTDATA,
|
||||
HDDS_CONTAINER_PERSISTDATA_DEFAULT);
|
||||
|
||||
if (!persist) {
|
||||
boolean scrubber = config.getBoolean(
|
||||
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
|
||||
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
|
||||
if (scrubber) {
|
||||
// Data Scrubber needs to be disabled for non-persistent chunks.
|
||||
LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false."
|
||||
+ " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED
|
||||
+ " also to false to enable non-persistent containers.");
|
||||
persist = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (persist) {
|
||||
manager = new ChunkManagerImpl(sync);
|
||||
} else {
|
||||
LOG.warn(HDDS_CONTAINER_PERSISTDATA
|
||||
+ " is set to false. This should be used only for testing."
|
||||
+ " All user data will be discarded.");
|
||||
manager = new ChunkManagerDummyImpl(sync);
|
||||
}
|
||||
|
||||
return manager;
|
||||
}
|
||||
}
|
@ -142,18 +142,12 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||
// the same term and log index appended as the current transaction
|
||||
commitChunk(tmpChunkFile, chunkFile);
|
||||
// Increment container stats here, as we commit the data.
|
||||
containerData.incrBytesUsed(info.getLen());
|
||||
containerData.incrWriteCount();
|
||||
containerData.incrWriteBytes(info.getLen());
|
||||
updateContainerWriteStats(container, info, isOverwrite);
|
||||
break;
|
||||
case COMBINED:
|
||||
// directly write to the chunk file
|
||||
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats, doSyncWrite);
|
||||
if (!isOverwrite) {
|
||||
containerData.incrBytesUsed(info.getLen());
|
||||
}
|
||||
containerData.incrWriteCount();
|
||||
containerData.incrWriteBytes(info.getLen());
|
||||
updateContainerWriteStats(container, info, isOverwrite);
|
||||
break;
|
||||
default:
|
||||
throw new IOException("Can not identify write operation.");
|
||||
@ -176,6 +170,18 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||
}
|
||||
}
|
||||
|
||||
protected void updateContainerWriteStats(Container container, ChunkInfo info,
|
||||
boolean isOverwrite) {
|
||||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||
.getContainerData();
|
||||
|
||||
if (!isOverwrite) {
|
||||
containerData.incrBytesUsed(info.getLen());
|
||||
}
|
||||
containerData.incrWriteCount();
|
||||
containerData.incrWriteBytes(info.getLen());
|
||||
}
|
||||
|
||||
/**
|
||||
* reads the data defined by a chunk.
|
||||
*
|
||||
|
@ -40,6 +40,7 @@
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
@ -251,6 +252,13 @@ public void init(OzoneConfiguration configuration) throws IOException {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (ozoneConfiguration != null) {
|
||||
if (!ozoneConfiguration.getBoolean(
|
||||
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA,
|
||||
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT)) {
|
||||
LOG.info("Override validateWrites to false, because "
|
||||
+ HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
|
||||
validateWrites = false;
|
||||
}
|
||||
init(ozoneConfiguration);
|
||||
} else {
|
||||
init(freon.createOzoneConfiguration());
|
||||
@ -282,6 +290,7 @@ public Void call() throws Exception {
|
||||
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
|
||||
LOG.info("Key size: {} bytes", keySize);
|
||||
LOG.info("Buffer size: {} bytes", bufferSize);
|
||||
LOG.info("validateWrites : {}", validateWrites);
|
||||
for (int i = 0; i < numOfThreads; i++) {
|
||||
executor.submit(new ObjectCreator());
|
||||
}
|
||||
@ -548,7 +557,7 @@ long getSuccessfulValidationCount() {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
long getUnsuccessfulValidationCount() {
|
||||
return writeValidationFailureCount;
|
||||
return validateWrites ? writeValidationFailureCount : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.freon;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Tests Freon with MiniOzoneCluster and ChunkManagerDummyImpl.
|
||||
* Data validation is disabled in RandomKeyGenerator.
|
||||
*/
|
||||
public class TestDataValidateWithDummyContainers
|
||||
extends TestDataValidate {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestDataValidate.class);
|
||||
/**
|
||||
* Create a MiniDFSCluster for testing.
|
||||
* <p>
|
||||
* Ozone is made active by setting OZONE_ENABLED = true
|
||||
*
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setBoolean(HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, false);
|
||||
conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA, false);
|
||||
conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
||||
false);
|
||||
startCluster(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write validation is not supported for non-persistent containers.
|
||||
* This test is a no-op.
|
||||
*/
|
||||
@Test
|
||||
@Override
|
||||
public void validateWriteTest() throws Exception {
|
||||
LOG.info("Skipping validateWriteTest for non-persistent containers.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown MiniDFSCluster.
|
||||
*/
|
||||
@AfterClass
|
||||
public static void shutdown() {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user