HDDS-1234. Iterate the OM DB snapshot and populate the recon container DB. Contributed by Aravindan Vijayan.
This commit is contained in:
parent
67dd45fc25
commit
e5d72f504e
@ -62,8 +62,4 @@ public void seekToLast() {
|
||||
levelDBIterator.seekToLast();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prefixSeek(byte[] prefix) {
|
||||
levelDBIterator.seek(prefix);
|
||||
}
|
||||
}
|
||||
|
@ -36,9 +36,4 @@ public interface MetaStoreIterator<T> extends Iterator<T> {
|
||||
*/
|
||||
void seekToLast();
|
||||
|
||||
/**
|
||||
* seek with prefix.
|
||||
*/
|
||||
void prefixSeek(byte[] prefix);
|
||||
|
||||
}
|
||||
|
@ -63,9 +63,4 @@ public void seekToLast() {
|
||||
rocksDBIterator.seekToLast();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prefixSeek(byte[] prefix) {
|
||||
rocksDBIterator.seek(prefix);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
@ -145,4 +146,9 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
|
||||
*/
|
||||
DBCheckpoint getCheckpoint(boolean flush) throws IOException;
|
||||
|
||||
/**
|
||||
* Get DB Store location.
|
||||
* @return DB file location.
|
||||
*/
|
||||
File getDbLocation();
|
||||
}
|
||||
|
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
/**
|
||||
* Codec to convert Integer to/from byte array.
|
||||
*/
|
||||
public class IntegerCodec implements Codec<Integer> {
|
||||
@Override
|
||||
public byte[] toPersistedFormat(Integer object) throws IOException {
|
||||
return Ints.toByteArray(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer fromPersistedFormat(byte[] rawData) throws IOException {
|
||||
return Ints.fromByteArray(rawData);
|
||||
}
|
||||
}
|
@ -283,9 +283,7 @@ public DBCheckpoint getCheckpoint(boolean flush) {
|
||||
return checkPointManager.createCheckpoint(checkpointsParentDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current DB Location.
|
||||
*/
|
||||
@Override
|
||||
public File getDbLocation() {
|
||||
return dbLocation;
|
||||
}
|
||||
|
@ -164,57 +164,6 @@ public void testIterator() throws Exception {
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testIteratorPrefixSeek() throws Exception {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
|
||||
File dbDir = GenericTestUtils.getRandomizedTestDir();
|
||||
MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
|
||||
.setConf(conf)
|
||||
.setCreateIfMissing(true)
|
||||
.setDbFile(dbDir)
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
dbStore.put(getBytes("a" + i), getBytes("a-value" + i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
dbStore.put(getBytes("b" + i), getBytes("b-value" + i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
dbStore.put(getBytes("c" + i), getBytes("c-value" + i));
|
||||
}
|
||||
|
||||
for (int i = 5; i < 10; i++) {
|
||||
dbStore.put(getBytes("b" + i), getBytes("b-value" + i));
|
||||
}
|
||||
|
||||
for (int i = 5; i < 10; i++) {
|
||||
dbStore.put(getBytes("a" + i), getBytes("a-value" + i));
|
||||
}
|
||||
|
||||
|
||||
MetaStoreIterator<KeyValue> metaStoreIterator = dbStore.iterator();
|
||||
metaStoreIterator.prefixSeek(getBytes("b"));
|
||||
int i = 0;
|
||||
while (metaStoreIterator.hasNext()) {
|
||||
KeyValue val = metaStoreIterator.next();
|
||||
String key = getString(val.getKey());
|
||||
if (key.startsWith("b")) {
|
||||
assertEquals("b-value" + i, getString(val.getValue()));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
assertTrue(i == 10);
|
||||
dbStore.close();
|
||||
dbStore.destroy();
|
||||
FileUtils.deleteDirectory(dbDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaStoreConfigDifferentFromType() throws IOException {
|
||||
|
||||
|
@ -34,4 +34,8 @@ private ReconConstants() {
|
||||
|
||||
public static final String RECON_OM_SNAPSHOT_DB =
|
||||
"om.snapshot.db";
|
||||
|
||||
public static final String CONTAINER_KEY_TABLE =
|
||||
"containerKeyTable";
|
||||
|
||||
}
|
||||
|
@ -21,11 +21,11 @@
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.ReconContainerDBProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Singleton;
|
||||
@ -38,7 +38,7 @@ public class ReconControllerModule extends AbstractModule {
|
||||
protected void configure() {
|
||||
bind(OzoneConfiguration.class).toProvider(OzoneConfigurationProvider.class);
|
||||
bind(ReconHttpServer.class).in(Singleton.class);
|
||||
bind(MetadataStore.class)
|
||||
bind(DBStore.class)
|
||||
.toProvider(ReconContainerDBProvider.class).in(Singleton.class);
|
||||
bind(ReconOMMetadataManager.class)
|
||||
.to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
|
||||
|
@ -18,9 +18,21 @@
|
||||
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hdds.cli.GenericCli;
|
||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -40,6 +52,9 @@
|
||||
public class ReconServer extends GenericCli {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReconServer.class);
|
||||
private final ScheduledExecutorService scheduler =
|
||||
Executors.newScheduledThreadPool(1);
|
||||
private Injector injector;
|
||||
|
||||
@Inject
|
||||
private ReconHttpServer httpServer;
|
||||
@ -53,12 +68,12 @@ public Void call() throws Exception {
|
||||
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
|
||||
OzoneConfigurationProvider.setConfiguration(ozoneConfiguration);
|
||||
|
||||
Injector injector = Guice.createInjector(new ReconControllerModule());
|
||||
injector = Guice.createInjector(new ReconControllerModule());
|
||||
|
||||
httpServer = injector.getInstance(ReconHttpServer.class);
|
||||
LOG.info("Starting Recon server");
|
||||
httpServer.start();
|
||||
|
||||
scheduleReconTasks();
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
try {
|
||||
stop();
|
||||
@ -69,6 +84,35 @@ public Void call() throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule the tasks that is required by Recon to keep its metadata up to
|
||||
* date.
|
||||
*/
|
||||
private void scheduleReconTasks() {
|
||||
OzoneConfiguration configuration = injector.getInstance(
|
||||
OzoneConfiguration.class);
|
||||
ContainerDBServiceProvider containerDBServiceProvider = injector
|
||||
.getInstance(ContainerDBServiceProvider.class);
|
||||
OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
|
||||
.getInstance(OzoneManagerServiceProvider.class);
|
||||
|
||||
// Schedule the task to read OM DB and write the reverse mapping to Recon
|
||||
// container DB.
|
||||
ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
|
||||
ozoneManagerServiceProvider, containerDBServiceProvider);
|
||||
long initialDelay = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long interval = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL,
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
scheduler.scheduleWithFixedDelay(containerKeyMapperTask, initialDelay,
|
||||
interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
void stop() throws Exception {
|
||||
LOG.info("Stopping Recon server");
|
||||
httpServer.stop();
|
||||
|
@ -17,17 +17,40 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.recon.api;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Endpoint for querying keys that belong to a container.
|
||||
*/
|
||||
@Path("/containers")
|
||||
public class ContainerKeyService {
|
||||
|
||||
@Inject
|
||||
private ContainerDBServiceProvider containerDBServiceProvider;
|
||||
|
||||
@Inject
|
||||
private ReconOMMetadataManager omMetadataManager;
|
||||
|
||||
/**
|
||||
* Return @{@link org.apache.hadoop.ozone.recon.api.types.KeyMetadata} for
|
||||
* all keys that belong to the container identified by the id param.
|
||||
@ -37,7 +60,57 @@ public class ContainerKeyService {
|
||||
*/
|
||||
@GET
|
||||
@Path("{id}")
|
||||
public Response getKeysForContainer(@PathParam("id") String containerId) {
|
||||
return Response.ok().build();
|
||||
public Response getKeysForContainer(@PathParam("id") Long containerId) {
|
||||
Map<String, KeyMetadata> keyMetadataMap = new HashMap<>();
|
||||
try {
|
||||
Map<ContainerKeyPrefix, Integer> containerKeyPrefixMap =
|
||||
containerDBServiceProvider.getKeyPrefixesForContainer(containerId);
|
||||
|
||||
// Get set of Container-Key mappings for given containerId.
|
||||
for (ContainerKeyPrefix containerKeyPrefix : containerKeyPrefixMap
|
||||
.keySet()) {
|
||||
|
||||
// Directly calling get() on the Key table instead of iterating since
|
||||
// only full keys are supported now. When we change to using a prefix
|
||||
// of the key, this needs to change to prefix seek (TODO).
|
||||
OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(
|
||||
containerKeyPrefix.getKeyPrefix());
|
||||
if (null == omKeyInfo) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter keys by version.
|
||||
List<Long> matchedVersions = omKeyInfo.getKeyLocationVersions()
|
||||
.stream()
|
||||
.filter(k -> (k.getVersion() == containerKeyPrefix.getKeyVersion()))
|
||||
.mapToLong(OmKeyLocationInfoGroup::getVersion)
|
||||
.boxed()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
String ozoneKey = omMetadataManager.getOzoneKey(
|
||||
omKeyInfo.getVolumeName(),
|
||||
omKeyInfo.getBucketName(),
|
||||
omKeyInfo.getKeyName());
|
||||
if (keyMetadataMap.containsKey(ozoneKey)) {
|
||||
keyMetadataMap.get(ozoneKey).getVersions().addAll(matchedVersions);
|
||||
} else {
|
||||
KeyMetadata keyMetadata = new KeyMetadata();
|
||||
keyMetadata.setBucket(omKeyInfo.getBucketName());
|
||||
keyMetadata.setVolume(omKeyInfo.getVolumeName());
|
||||
keyMetadata.setKey(omKeyInfo.getKeyName());
|
||||
keyMetadata.setCreationTime(
|
||||
Instant.ofEpochMilli(omKeyInfo.getCreationTime()));
|
||||
keyMetadata.setModificationTime(
|
||||
Instant.ofEpochMilli(omKeyInfo.getModificationTime()));
|
||||
keyMetadata.setDataSize(omKeyInfo.getDataSize());
|
||||
keyMetadata.setVersions(matchedVersions);
|
||||
keyMetadataMap.put(ozoneKey, keyMetadata);
|
||||
}
|
||||
}
|
||||
} catch (IOException ioEx) {
|
||||
throw new WebApplicationException(ioEx,
|
||||
Response.Status.INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
return Response.ok(keyMetadataMap.values()).build();
|
||||
}
|
||||
}
|
||||
|
@ -20,18 +20,30 @@
|
||||
|
||||
/**
|
||||
* Class to encapsulate the Key information needed for the Recon container DB.
|
||||
* Currently, it is containerId and key prefix.
|
||||
* Currently, it is the containerId and the whole key + key version.
|
||||
*/
|
||||
public class ContainerKeyPrefix {
|
||||
|
||||
private long containerId;
|
||||
private String keyPrefix;
|
||||
private long keyVersion = -1;
|
||||
|
||||
public ContainerKeyPrefix(long containerId, String keyPrefix) {
|
||||
this.containerId = containerId;
|
||||
this.keyPrefix = keyPrefix;
|
||||
}
|
||||
|
||||
public ContainerKeyPrefix(long containerId, String keyPrefix,
|
||||
long keyVersion) {
|
||||
this.containerId = containerId;
|
||||
this.keyPrefix = keyPrefix;
|
||||
this.keyVersion = keyVersion;
|
||||
}
|
||||
|
||||
public ContainerKeyPrefix(long containerId) {
|
||||
this.containerId = containerId;
|
||||
}
|
||||
|
||||
public long getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
@ -47,4 +59,31 @@ public String getKeyPrefix() {
|
||||
public void setKeyPrefix(String keyPrefix) {
|
||||
this.keyPrefix = keyPrefix;
|
||||
}
|
||||
|
||||
public long getKeyVersion() {
|
||||
return keyVersion;
|
||||
}
|
||||
|
||||
public void setKeyVersion(long keyVersion) {
|
||||
this.keyVersion = keyVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
||||
if (!(o instanceof ContainerKeyPrefix)) {
|
||||
return false;
|
||||
}
|
||||
ContainerKeyPrefix that = (ContainerKeyPrefix) o;
|
||||
return (this.containerId == that.containerId) &&
|
||||
this.keyPrefix.equals(that.keyPrefix) &&
|
||||
this.keyVersion == that.keyVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Long.valueOf(containerId).hashCode() + 13 * keyPrefix.hashCode() +
|
||||
17 * Long.valueOf(keyVersion).hashCode();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.ozone.recon.api.types;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
@ -30,21 +31,44 @@
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class KeyMetadata {
|
||||
|
||||
@XmlElement(name = "Volume")
|
||||
private String volume;
|
||||
|
||||
@XmlElement(name = "Bucket")
|
||||
private String bucket;
|
||||
|
||||
@XmlElement(name = "Key")
|
||||
private String key; // or the Object Name
|
||||
private String key;
|
||||
|
||||
@XmlElement(name = "DataSize")
|
||||
private long dataSize;
|
||||
|
||||
@XmlElement(name = "Versions")
|
||||
private List<Long> versions;
|
||||
|
||||
@XmlJavaTypeAdapter(IsoDateAdapter.class)
|
||||
@XmlElement(name = "LastModified")
|
||||
private Instant lastModified;
|
||||
@XmlElement(name = "CreationTime")
|
||||
private Instant creationTime;
|
||||
|
||||
@XmlElement(name = "ETag")
|
||||
private String eTag;
|
||||
@XmlJavaTypeAdapter(IsoDateAdapter.class)
|
||||
@XmlElement(name = "ModificationTime")
|
||||
private Instant modificationTime;
|
||||
|
||||
@XmlElement(name = "Size")
|
||||
private long size;
|
||||
public String getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
@XmlElement(name = "StorageClass")
|
||||
private String storageClass;
|
||||
public void setVolume(String volume) {
|
||||
this.volume = volume;
|
||||
}
|
||||
|
||||
public String getBucket() {
|
||||
return bucket;
|
||||
}
|
||||
|
||||
public void setBucket(String bucket) {
|
||||
this.bucket = bucket;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
@ -54,35 +78,35 @@ public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public Instant getLastModified() {
|
||||
return lastModified;
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
public void setLastModified(Instant lastModified) {
|
||||
this.lastModified = lastModified;
|
||||
public void setDataSize(long dataSize) {
|
||||
this.dataSize = dataSize;
|
||||
}
|
||||
|
||||
public String getETag() {
|
||||
return eTag;
|
||||
public Instant getCreationTime() {
|
||||
return creationTime;
|
||||
}
|
||||
|
||||
public void setETag(String tag) {
|
||||
this.eTag = tag;
|
||||
public void setCreationTime(Instant creationTime) {
|
||||
this.creationTime = creationTime;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return size;
|
||||
public Instant getModificationTime() {
|
||||
return modificationTime;
|
||||
}
|
||||
|
||||
public void setSize(long size) {
|
||||
this.size = size;
|
||||
public void setModificationTime(Instant modificationTime) {
|
||||
this.modificationTime = modificationTime;
|
||||
}
|
||||
|
||||
public String getStorageClass() {
|
||||
return storageClass;
|
||||
public List<Long> getVersions() {
|
||||
return versions;
|
||||
}
|
||||
|
||||
public void setStorageClass(String storageClass) {
|
||||
this.storageClass = storageClass;
|
||||
public void setVersions(List<Long> versions) {
|
||||
this.versions = versions;
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,6 @@
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.DBStoreBuilder;
|
||||
import org.apache.hadoop.utils.db.RDBStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -85,8 +84,7 @@ private void initializeNewRdbStore(File dbFile) throws IOException {
|
||||
@Override
|
||||
public void updateOmDB(File newDbLocation) throws IOException {
|
||||
if (getStore() != null) {
|
||||
RDBStore rdbStore = (RDBStore) getStore();
|
||||
File oldDBLocation = rdbStore.getDbLocation();
|
||||
File oldDBLocation = getStore().getDbLocation();
|
||||
if (oldDBLocation.exists()) {
|
||||
LOG.info("Cleaning up old OM snapshot db at {}.",
|
||||
oldDBLocation.getAbsolutePath());
|
||||
|
@ -30,6 +30,16 @@
|
||||
@InterfaceStability.Unstable
|
||||
public interface ContainerDBServiceProvider {
|
||||
|
||||
/**
|
||||
* Create new container DB and bulk Store the container to Key prefix
|
||||
* mapping.
|
||||
* @param containerKeyPrefixCounts Map of containerId, key-prefix tuple to
|
||||
* key count.
|
||||
*/
|
||||
void initNewContainerDB(Map<ContainerKeyPrefix, Integer>
|
||||
containerKeyPrefixCounts)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Store the container to Key prefix mapping into the Recon Container DB.
|
||||
*
|
||||
@ -54,5 +64,6 @@ Integer getCountForForContainerKeyPrefix(
|
||||
* @param containerId the given containerId.
|
||||
* @return Map of Key prefix -> count.
|
||||
*/
|
||||
Map<String, Integer> getKeyPrefixesForContainer(long containerId);
|
||||
Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(long containerId)
|
||||
throws IOException;
|
||||
}
|
||||
|
@ -28,9 +28,14 @@
|
||||
public interface OzoneManagerServiceProvider {
|
||||
|
||||
/**
|
||||
* Start taking OM Snapshots.
|
||||
* Initialize Ozone Manager Service Provider Impl.
|
||||
*/
|
||||
void start() throws IOException;
|
||||
void init() throws IOException;
|
||||
|
||||
/**
|
||||
* Update Recon OM DB with new snapshot from OM.
|
||||
*/
|
||||
void updateReconOmDBWithNewSnapshot() throws IOException;
|
||||
|
||||
/**
|
||||
* Return instance of OM Metadata manager.
|
||||
|
@ -18,27 +18,28 @@
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.commons.compress.utils.CharsetNames.UTF_8;
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.utils.MetaStoreIterator;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
import org.apache.hadoop.utils.db.Table.KeyValue;
|
||||
import org.apache.hadoop.utils.db.TableIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
|
||||
/**
|
||||
* Implementation of the Recon Container DB Service.
|
||||
*/
|
||||
@ -48,10 +49,52 @@ public class ContainerDBServiceProviderImpl
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerDBServiceProviderImpl.class);
|
||||
private final static String KEY_DELIMITER = "_";
|
||||
|
||||
private Table<ContainerKeyPrefix, Integer> containerKeyTable;
|
||||
|
||||
@Inject
|
||||
private MetadataStore containerDBStore;
|
||||
private OzoneConfiguration configuration;
|
||||
|
||||
@Inject
|
||||
private DBStore containerDbStore;
|
||||
|
||||
@Inject
|
||||
public ContainerDBServiceProviderImpl(DBStore dbStore) {
|
||||
try {
|
||||
this.containerKeyTable = dbStore.getTable(CONTAINER_KEY_TABLE,
|
||||
ContainerKeyPrefix.class, Integer.class);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to create Container Key Table. " + e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a new container DB instance, getting rid of the old instance
|
||||
* and then storing the passed in container prefix counts into the created
|
||||
* DB instance.
|
||||
* @param containerKeyPrefixCounts Map of containerId, key-prefix tuple to
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void initNewContainerDB(Map<ContainerKeyPrefix, Integer>
|
||||
containerKeyPrefixCounts)
|
||||
throws IOException {
|
||||
|
||||
File oldDBLocation = containerDbStore.getDbLocation();
|
||||
containerDbStore = ReconContainerDBProvider.getNewDBStore(configuration);
|
||||
containerKeyTable = containerDbStore.getTable(CONTAINER_KEY_TABLE,
|
||||
ContainerKeyPrefix.class, Integer.class);
|
||||
|
||||
if (oldDBLocation.exists()) {
|
||||
LOG.info("Cleaning up old Recon Container DB at {}.",
|
||||
oldDBLocation.getAbsolutePath());
|
||||
FileUtils.deleteQuietly(oldDBLocation);
|
||||
}
|
||||
for (Map.Entry<ContainerKeyPrefix, Integer> entry :
|
||||
containerKeyPrefixCounts.entrySet()) {
|
||||
containerKeyTable.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Concatenate the containerId and Key Prefix using a delimiter and store the
|
||||
@ -65,13 +108,7 @@ public class ContainerDBServiceProviderImpl
|
||||
public void storeContainerKeyMapping(ContainerKeyPrefix containerKeyPrefix,
|
||||
Integer count)
|
||||
throws IOException {
|
||||
byte[] containerIdBytes = Longs.toByteArray(containerKeyPrefix
|
||||
.getContainerId());
|
||||
byte[] keyPrefixBytes = (KEY_DELIMITER + containerKeyPrefix.getKeyPrefix())
|
||||
.getBytes(UTF_8);
|
||||
byte[] dbKey = ArrayUtils.addAll(containerIdBytes, keyPrefixBytes);
|
||||
byte[] dbValue = ByteBuffer.allocate(Integer.BYTES).putInt(count).array();
|
||||
containerDBStore.put(dbKey, dbValue);
|
||||
containerKeyTable.put(containerKeyPrefix, count);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -85,13 +122,8 @@ public void storeContainerKeyMapping(ContainerKeyPrefix containerKeyPrefix,
|
||||
@Override
|
||||
public Integer getCountForForContainerKeyPrefix(
|
||||
ContainerKeyPrefix containerKeyPrefix) throws IOException {
|
||||
byte[] containerIdBytes = Longs.toByteArray(containerKeyPrefix
|
||||
.getContainerId());
|
||||
byte[] keyPrefixBytes = (KEY_DELIMITER + containerKeyPrefix
|
||||
.getKeyPrefix()).getBytes(UTF_8);
|
||||
byte[] dbKey = ArrayUtils.addAll(containerIdBytes, keyPrefixBytes);
|
||||
byte[] dbValue = containerDBStore.get(dbKey);
|
||||
return ByteBuffer.wrap(dbValue).getInt();
|
||||
Integer count = containerKeyTable.get(containerKeyPrefix);
|
||||
return count == null ? Integer.valueOf(0) : count;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -102,31 +134,27 @@ public Integer getCountForForContainerKeyPrefix(
|
||||
* @return Map of (Key-Prefix,Count of Keys).
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Integer> getKeyPrefixesForContainer(long containerId) {
|
||||
public Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(
|
||||
long containerId) throws IOException {
|
||||
|
||||
Map<String, Integer> prefixes = new HashMap<>();
|
||||
MetaStoreIterator<MetadataStore.KeyValue> containerIterator =
|
||||
containerDBStore.iterator();
|
||||
byte[] containerIdPrefixBytes = Longs.toByteArray(containerId);
|
||||
containerIterator.prefixSeek(containerIdPrefixBytes);
|
||||
Map<ContainerKeyPrefix, Integer> prefixes = new HashMap<>();
|
||||
TableIterator<ContainerKeyPrefix, ? extends KeyValue<ContainerKeyPrefix,
|
||||
Integer>> containerIterator = containerKeyTable.iterator();
|
||||
containerIterator.seek(new ContainerKeyPrefix(containerId));
|
||||
while (containerIterator.hasNext()) {
|
||||
MetadataStore.KeyValue keyValue = containerIterator.next();
|
||||
byte[] containerKey = keyValue.getKey();
|
||||
long containerIdFromDB = ByteBuffer.wrap(ArrayUtils.subarray(
|
||||
containerKey, 0, Long.BYTES)).getLong();
|
||||
|
||||
KeyValue<ContainerKeyPrefix, Integer> keyValue = containerIterator.next();
|
||||
ContainerKeyPrefix containerKeyPrefix = keyValue.getKey();
|
||||
//The prefix seek only guarantees that the iterator's head will be
|
||||
// positioned at the first prefix match. We still have to check the key
|
||||
// prefix.
|
||||
if (containerIdFromDB == containerId) {
|
||||
byte[] keyPrefix = ArrayUtils.subarray(containerKey,
|
||||
containerIdPrefixBytes.length + 1,
|
||||
containerKey.length);
|
||||
try {
|
||||
prefixes.put(new String(keyPrefix, UTF_8),
|
||||
ByteBuffer.wrap(keyValue.getValue()).getInt());
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
LOG.warn("Unable to read key prefix from container DB.", e);
|
||||
if (containerKeyPrefix.getContainerId() == containerId) {
|
||||
if (StringUtils.isNotEmpty(containerKeyPrefix.getKeyPrefix())) {
|
||||
prefixes.put(new ContainerKeyPrefix(containerId,
|
||||
containerKeyPrefix.getKeyPrefix(),
|
||||
containerKeyPrefix.getKeyVersion()),
|
||||
keyValue.getValue());
|
||||
} else {
|
||||
LOG.warn("Null key prefix returned for containerId = " + containerId);
|
||||
}
|
||||
} else {
|
||||
break; //Break when the first mismatch occurs.
|
||||
|
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.commons.compress.utils.CharsetNames.UTF_8;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
|
||||
/**
|
||||
* Codec to encode ContainerKeyPrefix as byte array.
|
||||
*/
|
||||
public class ContainerKeyPrefixCodec implements Codec<ContainerKeyPrefix>{
|
||||
|
||||
private final static String KEY_DELIMITER = "_";
|
||||
|
||||
@Override
|
||||
public byte[] toPersistedFormat(ContainerKeyPrefix containerKeyPrefix)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(containerKeyPrefix,
|
||||
"Null object can't be converted to byte array.");
|
||||
byte[] containerIdBytes = Longs.toByteArray(containerKeyPrefix
|
||||
.getContainerId());
|
||||
|
||||
//Prefix seek can be done only with containerId. In that case, we can
|
||||
// expect the key and version to be undefined.
|
||||
if (StringUtils.isNotEmpty(containerKeyPrefix.getKeyPrefix())) {
|
||||
byte[] keyPrefixBytes = (KEY_DELIMITER +
|
||||
containerKeyPrefix.getKeyPrefix()).getBytes(UTF_8);
|
||||
containerIdBytes = ArrayUtils.addAll(containerIdBytes, keyPrefixBytes);
|
||||
}
|
||||
|
||||
if (containerKeyPrefix.getKeyVersion() != -1) {
|
||||
containerIdBytes = ArrayUtils.addAll(containerIdBytes, KEY_DELIMITER
|
||||
.getBytes(UTF_8));
|
||||
containerIdBytes = ArrayUtils.addAll(containerIdBytes, Longs.toByteArray(
|
||||
containerKeyPrefix.getKeyVersion()));
|
||||
}
|
||||
return containerIdBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerKeyPrefix fromPersistedFormat(byte[] rawData)
|
||||
throws IOException {
|
||||
|
||||
// First 8 bytes is the containerId.
|
||||
long containerIdFromDB = ByteBuffer.wrap(ArrayUtils.subarray(
|
||||
rawData, 0, Long.BYTES)).getLong();
|
||||
// When reading from byte[], we can always expect to have the containerId,
|
||||
// key and version parts in the byte array.
|
||||
byte[] keyBytes = ArrayUtils.subarray(rawData,
|
||||
Long.BYTES + 1,
|
||||
rawData.length - Long.BYTES - 1);
|
||||
String keyPrefix = new String(keyBytes, UTF_8);
|
||||
|
||||
// Last 8 bytes is the key version.
|
||||
byte[] versionBytes = ArrayUtils.subarray(rawData,
|
||||
rawData.length - Long.BYTES,
|
||||
rawData.length);
|
||||
long version = ByteBuffer.wrap(versionBytes).getLong();
|
||||
return new ContainerKeyPrefix(containerIdFromDB, keyPrefix, version);
|
||||
}
|
||||
}
|
@ -27,10 +27,6 @@
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
|
||||
@ -42,8 +38,6 @@
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.inject.Inject;
|
||||
@ -75,7 +69,6 @@ public class OzoneManagerServiceProviderImpl
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
|
||||
|
||||
private ScheduledExecutorService executorService;
|
||||
private final String dbCheckpointEndPoint = "/dbCheckpoint";
|
||||
private final CloseableHttpClient httpClient;
|
||||
private File omSnapshotDBParentDir = null;
|
||||
@ -89,7 +82,6 @@ public class OzoneManagerServiceProviderImpl
|
||||
|
||||
@Inject
|
||||
public OzoneManagerServiceProviderImpl(OzoneConfiguration configuration) {
|
||||
executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
|
||||
.OZONE_OM_HTTP_ADDRESS_KEY);
|
||||
@ -141,21 +133,14 @@ public OzoneManagerServiceProviderImpl(OzoneConfiguration configuration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws IOException {
|
||||
public void init() throws IOException {
|
||||
updateReconOmDBWithNewSnapshot();
|
||||
}
|
||||
|
||||
//Schedule a task to periodically obtain the DB snapshot from OM and
|
||||
@Override
|
||||
public void updateReconOmDBWithNewSnapshot() throws IOException {
|
||||
//Obtain the current DB snapshot from OM and
|
||||
//update the in house OM metadata managed DB instance.
|
||||
long initialDelay = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long interval = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL,
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
LOG.info("Starting thread to get OM DB Snapshot.");
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
|
||||
if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
|
||||
try {
|
||||
@ -165,10 +150,8 @@ public void start() throws IOException {
|
||||
LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Null snapshot got from OM, {}",
|
||||
dbSnapshot.getCheckpointLocation());
|
||||
LOG.error("Null snapshot location got from OM.");
|
||||
}
|
||||
}, initialDelay, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -16,23 +16,20 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi;
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.DBStoreBuilder;
|
||||
import org.apache.hadoop.utils.db.IntegerCodec;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -44,8 +41,7 @@
|
||||
/**
|
||||
* Provider for the Recon container DB (Metadata store).
|
||||
*/
|
||||
public class ReconContainerDBProvider implements
|
||||
Provider<MetadataStore> {
|
||||
public class ReconContainerDBProvider implements Provider<DBStore> {
|
||||
|
||||
@VisibleForTesting
|
||||
private static final Logger LOG =
|
||||
@ -55,30 +51,30 @@ public class ReconContainerDBProvider implements
|
||||
private OzoneConfiguration configuration;
|
||||
|
||||
@Override
|
||||
public MetadataStore get() {
|
||||
File metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR);
|
||||
File containerDBPath = new File(metaDir,
|
||||
RECON_CONTAINER_DB);
|
||||
int cacheSize = configuration.getInt(OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB,
|
||||
OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
String dbType = configuration.get(OZONE_RECON_CONTAINER_DB_STORE_IMPL,
|
||||
OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT);
|
||||
MetadataStore metadataStore = null;
|
||||
try {
|
||||
metadataStore = MetadataStoreBuilder.newBuilder()
|
||||
.setConf(configuration)
|
||||
.setDBType(dbType)
|
||||
.setDbFile(containerDBPath)
|
||||
.setCacheSize(cacheSize * OzoneConsts.MB)
|
||||
.build();
|
||||
} catch (IOException ioEx) {
|
||||
LOG.error("Unable to initialize Recon container metadata store.", ioEx);
|
||||
}
|
||||
if (metadataStore == null) {
|
||||
throw new ProvisionException("Unable to provide instance of Metadata " +
|
||||
public DBStore get() {
|
||||
DBStore dbStore = getNewDBStore(configuration);
|
||||
if (dbStore == null) {
|
||||
throw new ProvisionException("Unable to provide instance of DBStore " +
|
||||
"store.");
|
||||
}
|
||||
return metadataStore;
|
||||
return dbStore;
|
||||
}
|
||||
|
||||
public static DBStore getNewDBStore(OzoneConfiguration configuration) {
|
||||
DBStore dbStore = null;
|
||||
String dbName = RECON_CONTAINER_DB + "_" + System.currentTimeMillis();
|
||||
try {
|
||||
Path metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR).toPath();
|
||||
dbStore = DBStoreBuilder.newBuilder(configuration)
|
||||
.setPath(metaDir)
|
||||
.setName(dbName)
|
||||
.addTable(CONTAINER_KEY_TABLE)
|
||||
.addCodec(ContainerKeyPrefix.class, new ContainerKeyPrefixCodec())
|
||||
.addCodec(Integer.class, new IntegerCodec())
|
||||
.build();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Unable to initialize Recon container metadata store.", ex);
|
||||
}
|
||||
return dbStore;
|
||||
}
|
||||
}
|
@ -0,0 +1,107 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.tasks;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
import org.apache.hadoop.utils.db.TableIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class to iterate over the OM DB and populate the Recon container DB with
|
||||
* the container -> Key reverse mapping.
|
||||
*/
|
||||
public class ContainerKeyMapperTask implements Runnable {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerKeyMapperTask.class);
|
||||
|
||||
private OzoneManagerServiceProvider ozoneManagerServiceProvider;
|
||||
private ContainerDBServiceProvider containerDBServiceProvider;
|
||||
|
||||
public ContainerKeyMapperTask(
|
||||
OzoneManagerServiceProvider ozoneManagerServiceProvider,
|
||||
ContainerDBServiceProvider containerDBServiceProvider) {
|
||||
this.ozoneManagerServiceProvider = ozoneManagerServiceProvider;
|
||||
this.containerDBServiceProvider = containerDBServiceProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read Key -> ContainerId data from OM snapshot DB and write reverse map
|
||||
* (container, key) -> count to Recon Container DB.
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
int omKeyCount = 0;
|
||||
int containerCount = 0;
|
||||
try {
|
||||
LOG.info("Starting a run of ContainerKeyMapperTask.");
|
||||
Instant start = Instant.now();
|
||||
|
||||
//Update OM DB Snapshot.
|
||||
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
|
||||
|
||||
OMMetadataManager omMetadataManager = ozoneManagerServiceProvider
|
||||
.getOMMetadataManagerInstance();
|
||||
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
|
||||
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
|
||||
keyIter = omKeyInfoTable.iterator()) {
|
||||
while (keyIter.hasNext()) {
|
||||
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
|
||||
StringBuilder key = new StringBuilder(kv.getKey());
|
||||
OmKeyInfo omKeyInfo = kv.getValue();
|
||||
for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
|
||||
.getKeyLocationVersions()) {
|
||||
long keyVersion = omKeyLocationInfoGroup.getVersion();
|
||||
for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
|
||||
.getLocationList()) {
|
||||
long containerId = omKeyLocationInfo.getContainerID();
|
||||
ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
|
||||
containerId, key.toString(), keyVersion);
|
||||
containerDBServiceProvider.storeContainerKeyMapping(
|
||||
containerKeyPrefix, 1);
|
||||
containerCount++;
|
||||
}
|
||||
}
|
||||
omKeyCount++;
|
||||
}
|
||||
}
|
||||
LOG.info("Completed the run of ContainerKeyMapperTask.");
|
||||
Instant end = Instant.now();
|
||||
long duration = Duration.between(start, end).toMillis();
|
||||
LOG.info("It took me " + (double)duration / 1000.0 + " seconds to " +
|
||||
"process " + omKeyCount + " keys and " + containerCount + " " +
|
||||
"containers.");
|
||||
} catch (IOException ioEx) {
|
||||
LOG.error("Unable to populate Container Key Prefix data in Recon DB. ",
|
||||
ioEx);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* The classes in this package contains the various scheduled tasks used by
|
||||
* Recon.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.recon.tasks;
|
@ -0,0 +1,172 @@
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.ozone.om.BucketManager;
|
||||
import org.apache.hadoop.ozone.om.BucketManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Utility methods for test classes.
|
||||
*/
|
||||
public abstract class AbstractOMMetadataManagerTest {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
/**
|
||||
* Create a new OM Metadata manager instance.
|
||||
* @throws IOException ioEx
|
||||
*/
|
||||
protected OMMetadataManager initializeNewOmMetadataManager()
|
||||
throws IOException {
|
||||
File omDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration omConfiguration = new OzoneConfiguration();
|
||||
omConfiguration.set(OZONE_OM_DB_DIRS,
|
||||
omDbDir.getAbsolutePath());
|
||||
OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
|
||||
omConfiguration);
|
||||
|
||||
String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
|
||||
OmVolumeArgs args =
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setVolume("sampleVol")
|
||||
.setAdminName("TestUser")
|
||||
.setOwnerName("TestUser")
|
||||
.build();
|
||||
omMetadataManager.getVolumeTable().put(volumeKey, args);
|
||||
|
||||
BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
|
||||
OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
|
||||
.setVolumeName("sampleVol")
|
||||
.setBucketName("bucketOne")
|
||||
.build();
|
||||
bucketManager.createBucket(bucketInfo);
|
||||
|
||||
return omMetadataManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an instance of Recon OM Metadata manager.
|
||||
* @return ReconOMMetadataManager
|
||||
* @throws IOException when creating the RocksDB instance.
|
||||
*/
|
||||
protected ReconOMMetadataManager getTestMetadataManager(
|
||||
OMMetadataManager omMetadataManager)
|
||||
throws IOException {
|
||||
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
assertNotNull(checkpoint.getCheckpointLocation());
|
||||
|
||||
File reconOmDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
|
||||
.getAbsolutePath());
|
||||
|
||||
ReconOMMetadataManager reconOMMetaMgr =
|
||||
new ReconOmMetadataManagerImpl(configuration);
|
||||
reconOMMetaMgr.start(configuration);
|
||||
|
||||
reconOMMetaMgr.updateOmDB(
|
||||
checkpoint.getCheckpointLocation().toFile());
|
||||
return reconOMMetaMgr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a key to OM instance.
|
||||
* @throws IOException while writing.
|
||||
*/
|
||||
public void writeDataToOm(OMMetadataManager omMetadataManager,
|
||||
String key) throws IOException {
|
||||
|
||||
String omKey = omMetadataManager.getOzoneKey("sampleVol",
|
||||
"bucketOne", key);
|
||||
|
||||
omMetadataManager.getKeyTable().put(omKey,
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName(key)
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a key to OM instance.
|
||||
* @throws IOException while writing.
|
||||
*/
|
||||
protected void writeDataToOm(OMMetadataManager omMetadataManager,
|
||||
String key,
|
||||
String bucket,
|
||||
String volume,
|
||||
List<OmKeyLocationInfoGroup>
|
||||
omKeyLocationInfoGroupList)
|
||||
throws IOException {
|
||||
|
||||
String omKey = omMetadataManager.getOzoneKey(volume,
|
||||
bucket, key);
|
||||
|
||||
omMetadataManager.getKeyTable().put(omKey,
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName(bucket)
|
||||
.setVolumeName(volume)
|
||||
.setKeyName(key)
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.setOmKeyLocationInfos(omKeyLocationInfoGroupList)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return random pipeline.
|
||||
* @return pipeline
|
||||
*/
|
||||
protected Pipeline getRandomPipeline() {
|
||||
return Pipeline.newBuilder()
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setId(PipelineID.randomId())
|
||||
.setNodes(Collections.EMPTY_LIST)
|
||||
.setState(Pipeline.PipelineState.OPEN)
|
||||
.setType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get new OmKeyLocationInfo for given BlockID and Pipeline.
|
||||
* @param blockID blockId
|
||||
* @param pipeline pipeline
|
||||
* @return new instance of OmKeyLocationInfo
|
||||
*/
|
||||
protected OmKeyLocationInfo getOmKeyLocationInfo(BlockID blockID,
|
||||
Pipeline pipeline) {
|
||||
return new OmKeyLocationInfo.Builder()
|
||||
.setBlockID(blockID)
|
||||
.setPipeline(pipeline)
|
||||
.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ContainerKeyPrefixCodec;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
import org.apache.hadoop.utils.db.IntegerCodec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit Tests for Codecs used in Recon.
|
||||
*/
|
||||
public class TestReconCodecs {
|
||||
|
||||
@Test
|
||||
public void testContainerKeyPrefixCodec() throws IOException {
|
||||
ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
|
||||
System.currentTimeMillis(), "TestKeyPrefix", 0);
|
||||
|
||||
Codec<ContainerKeyPrefix> codec = new ContainerKeyPrefixCodec();
|
||||
byte[] persistedFormat = codec.toPersistedFormat(containerKeyPrefix);
|
||||
Assert.assertTrue(persistedFormat != null);
|
||||
ContainerKeyPrefix fromPersistedFormat =
|
||||
codec.fromPersistedFormat(persistedFormat);
|
||||
Assert.assertEquals(containerKeyPrefix, fromPersistedFormat);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIntegerCodec() throws IOException {
|
||||
Integer i = 1000;
|
||||
Codec<Integer> codec = new IntegerCodec();
|
||||
byte[] persistedFormat = codec.toPersistedFormat(i);
|
||||
Assert.assertTrue(persistedFormat != null);
|
||||
Integer fromPersistedFormat =
|
||||
codec.fromPersistedFormat(persistedFormat);
|
||||
Assert.assertEquals(i, fromPersistedFormat);
|
||||
}
|
||||
}
|
@ -63,7 +63,7 @@ public void testGetReconDbDir() throws Exception {
|
||||
|
||||
File file = ReconUtils.getReconDbDir(configuration,
|
||||
"TEST_DB_DIR");
|
||||
Assert.assertEquals(file.getAbsolutePath(), filePath);
|
||||
Assert.assertEquals(filePath, file.getAbsolutePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -129,7 +129,7 @@ public int read() throws IOException {
|
||||
InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
|
||||
String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
|
||||
|
||||
assertEquals(contents, "File 1 Contents");
|
||||
assertEquals("File 1 Contents", contents);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,216 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.api;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
|
||||
import org.apache.hadoop.ozone.recon.ReconUtils;
|
||||
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
|
||||
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.mockito.PowerMockito;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
/**
|
||||
* Test for container key service.
|
||||
*/
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
|
||||
@PrepareForTest(ReconUtils.class)
|
||||
public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
|
||||
|
||||
private ContainerDBServiceProvider containerDbServiceProvider;
|
||||
private OMMetadataManager omMetadataManager;
|
||||
private ReconOMMetadataManager reconOMMetadataManager;
|
||||
private Injector injector;
|
||||
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
|
||||
private ContainerKeyService containerKeyService = new ContainerKeyService();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
omMetadataManager = initializeNewOmMetadataManager();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
try {
|
||||
bind(OzoneConfiguration.class).toInstance(
|
||||
getTestOzoneConfiguration());
|
||||
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
|
||||
bind(ReconOMMetadataManager.class).toInstance(reconOMMetadataManager);
|
||||
bind(DBStore.class).toProvider(ReconContainerDBProvider.class).
|
||||
in(Singleton.class);
|
||||
bind(ContainerDBServiceProvider.class).to(
|
||||
ContainerDBServiceProviderImpl.class).in(Singleton.class);
|
||||
bind(ContainerKeyService.class).toInstance(containerKeyService);
|
||||
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
|
||||
getTestOzoneConfiguration());
|
||||
bind(OzoneManagerServiceProvider.class)
|
||||
.toInstance(ozoneManagerServiceProvider);
|
||||
} catch (IOException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
});
|
||||
containerDbServiceProvider = injector.getInstance(
|
||||
ContainerDBServiceProvider.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeysForContainer() throws Exception {
|
||||
|
||||
//Write Data to OM
|
||||
Pipeline pipeline = getRandomPipeline();
|
||||
|
||||
List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
|
||||
BlockID blockID1 = new BlockID(1, 1);
|
||||
OmKeyLocationInfo omKeyLocationInfo1 = getOmKeyLocationInfo(blockID1,
|
||||
pipeline);
|
||||
omKeyLocationInfoList.add(omKeyLocationInfo1);
|
||||
|
||||
BlockID blockID2 = new BlockID(2, 1);
|
||||
OmKeyLocationInfo omKeyLocationInfo2 = getOmKeyLocationInfo(blockID2,
|
||||
pipeline);
|
||||
omKeyLocationInfoList.add(omKeyLocationInfo2);
|
||||
|
||||
OmKeyLocationInfoGroup omKeyLocationInfoGroup = new
|
||||
OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
|
||||
|
||||
//key = key_one, Blocks = [ {CID = 1, LID = 1}, {CID = 2, LID = 1} ]
|
||||
writeDataToOm(omMetadataManager,
|
||||
"key_one", "bucketOne", "sampleVol",
|
||||
Collections.singletonList(omKeyLocationInfoGroup));
|
||||
|
||||
List<OmKeyLocationInfoGroup> infoGroups = new ArrayList<>();
|
||||
BlockID blockID3 = new BlockID(1, 2);
|
||||
OmKeyLocationInfo omKeyLocationInfo3 = getOmKeyLocationInfo(blockID3,
|
||||
pipeline);
|
||||
|
||||
List<OmKeyLocationInfo> omKeyLocationInfoListNew = new ArrayList<>();
|
||||
omKeyLocationInfoListNew.add(omKeyLocationInfo3);
|
||||
infoGroups.add(new OmKeyLocationInfoGroup(0,
|
||||
omKeyLocationInfoListNew));
|
||||
|
||||
BlockID blockID4 = new BlockID(1, 3);
|
||||
OmKeyLocationInfo omKeyLocationInfo4 = getOmKeyLocationInfo(blockID4,
|
||||
pipeline);
|
||||
|
||||
omKeyLocationInfoListNew = new ArrayList<>();
|
||||
omKeyLocationInfoListNew.add(omKeyLocationInfo4);
|
||||
infoGroups.add(new OmKeyLocationInfoGroup(1,
|
||||
omKeyLocationInfoListNew));
|
||||
|
||||
//key = key_two, Blocks = [ {CID = 1, LID = 2}, {CID = 1, LID = 3} ]
|
||||
writeDataToOm(omMetadataManager,
|
||||
"key_two", "bucketOne", "sampleVol", infoGroups);
|
||||
|
||||
//Take snapshot of OM DB and copy over to Recon OM DB.
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
|
||||
InputStream inputStream = new FileInputStream(tarFile);
|
||||
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
|
||||
"makeHttpCall",
|
||||
CloseableHttpClient.class, String.class))
|
||||
.toReturn(inputStream);
|
||||
|
||||
//Generate Recon container DB data.
|
||||
ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
|
||||
ozoneManagerServiceProvider, containerDbServiceProvider);
|
||||
containerKeyMapperTask.run();
|
||||
|
||||
Response response = containerKeyService.getKeysForContainer(1L);
|
||||
|
||||
Collection<KeyMetadata> keyMetadataList =
|
||||
(Collection<KeyMetadata>) response.getEntity();
|
||||
assertTrue(keyMetadataList.size() == 2);
|
||||
|
||||
Iterator<KeyMetadata> iterator = keyMetadataList.iterator();
|
||||
|
||||
KeyMetadata keyMetadata = iterator.next();
|
||||
assertTrue(keyMetadata.getKey().equals("key_one"));
|
||||
assertTrue(keyMetadata.getVersions().size() == 1);
|
||||
|
||||
keyMetadata = iterator.next();
|
||||
assertTrue(keyMetadata.getKey().equals("key_two"));
|
||||
assertTrue(keyMetadata.getVersions().size() == 2);
|
||||
assertTrue(keyMetadata.getVersions().contains(0L) && keyMetadata
|
||||
.getVersions().contains(1L));
|
||||
|
||||
response = containerKeyService.getKeysForContainer(3L);
|
||||
keyMetadataList = (Collection<KeyMetadata>) response.getEntity();
|
||||
assertTrue(keyMetadataList.isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Test OzoneConfiguration instance.
|
||||
* @return OzoneConfiguration
|
||||
* @throws IOException ioEx.
|
||||
*/
|
||||
private OzoneConfiguration getTestOzoneConfiguration()
|
||||
throws IOException {
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
|
||||
temporaryFolder.newFolder().getAbsolutePath());
|
||||
configuration.set(OZONE_RECON_DB_DIR, temporaryFolder.newFolder()
|
||||
.getAbsolutePath());
|
||||
return configuration;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* The classes in this package test the Rest API layer of Recon.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.recon.api;
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
@ -28,10 +30,9 @@
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.utils.MetaStoreIterator;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -40,6 +41,7 @@
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
/**
|
||||
* Unit Tests for ContainerDBServiceProviderImpl.
|
||||
@ -49,28 +51,27 @@ public class TestContainerDBServiceProviderImpl {
|
||||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private MetadataStore containerDBStore;
|
||||
private ContainerDBServiceProvider containerDbServiceProvider
|
||||
= new ContainerDBServiceProviderImpl();
|
||||
private ContainerDBServiceProvider containerDbServiceProvider;
|
||||
private Injector injector;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
tempFolder.create();
|
||||
File dbDir = tempFolder.getRoot();
|
||||
containerDBStore = MetadataStoreBuilder.newBuilder()
|
||||
.setConf(new OzoneConfiguration())
|
||||
.setCreateIfMissing(true)
|
||||
.setDbFile(dbDir)
|
||||
.build();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(MetadataStore.class).toInstance(containerDBStore);
|
||||
bind(ContainerDBServiceProvider.class)
|
||||
.toInstance(containerDbServiceProvider);
|
||||
File dbDir = tempFolder.getRoot();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_DB_DIR, dbDir.getAbsolutePath());
|
||||
bind(OzoneConfiguration.class).toInstance(configuration);
|
||||
bind(DBStore.class).toProvider(ReconContainerDBProvider.class).in(
|
||||
Singleton.class);
|
||||
bind(ContainerDBServiceProvider.class).to(
|
||||
ContainerDBServiceProviderImpl.class).in(Singleton.class);
|
||||
}
|
||||
});
|
||||
containerDbServiceProvider = injector.getInstance(
|
||||
ContainerDBServiceProvider.class);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -78,6 +79,55 @@ public void tearDown() throws Exception {
|
||||
tempFolder.delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitNewContainerDB() throws Exception {
|
||||
long containerId = System.currentTimeMillis();
|
||||
Map<ContainerKeyPrefix, Integer> prefixCounts = new HashMap<>();
|
||||
|
||||
ContainerKeyPrefix ckp1 = new ContainerKeyPrefix(containerId,
|
||||
"V1/B1/K1", 0);
|
||||
prefixCounts.put(ckp1, 1);
|
||||
|
||||
ContainerKeyPrefix ckp2 = new ContainerKeyPrefix(containerId,
|
||||
"V1/B1/K2", 0);
|
||||
prefixCounts.put(ckp2, 2);
|
||||
|
||||
ContainerKeyPrefix ckp3 = new ContainerKeyPrefix(containerId,
|
||||
"V1/B2/K3", 0);
|
||||
prefixCounts.put(ckp3, 3);
|
||||
|
||||
for (ContainerKeyPrefix prefix : prefixCounts.keySet()) {
|
||||
containerDbServiceProvider.storeContainerKeyMapping(
|
||||
prefix, prefixCounts.get(prefix));
|
||||
}
|
||||
|
||||
assertEquals(1, containerDbServiceProvider
|
||||
.getCountForForContainerKeyPrefix(ckp1).intValue());
|
||||
|
||||
prefixCounts.clear();
|
||||
prefixCounts.put(ckp2, 12);
|
||||
prefixCounts.put(ckp3, 13);
|
||||
ContainerKeyPrefix ckp4 = new ContainerKeyPrefix(containerId,
|
||||
"V1/B3/K1", 0);
|
||||
prefixCounts.put(ckp4, 14);
|
||||
ContainerKeyPrefix ckp5 = new ContainerKeyPrefix(containerId,
|
||||
"V1/B3/K2", 0);
|
||||
prefixCounts.put(ckp5, 15);
|
||||
|
||||
containerDbServiceProvider.initNewContainerDB(prefixCounts);
|
||||
Map<ContainerKeyPrefix, Integer> keyPrefixesForContainer =
|
||||
containerDbServiceProvider.getKeyPrefixesForContainer(containerId);
|
||||
|
||||
assertEquals(4, keyPrefixesForContainer.size());
|
||||
assertEquals(12, keyPrefixesForContainer.get(ckp2).intValue());
|
||||
assertEquals(13, keyPrefixesForContainer.get(ckp3).intValue());
|
||||
assertEquals(14, keyPrefixesForContainer.get(ckp4).intValue());
|
||||
assertEquals(15, keyPrefixesForContainer.get(ckp5).intValue());
|
||||
|
||||
assertEquals(0, containerDbServiceProvider
|
||||
.getCountForForContainerKeyPrefix(ckp1).intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreContainerKeyMapping() throws Exception {
|
||||
|
||||
@ -89,19 +139,23 @@ public void testStoreContainerKeyMapping() throws Exception {
|
||||
|
||||
for (String prefix : prefixCounts.keySet()) {
|
||||
ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(
|
||||
containerId, prefix);
|
||||
containerId, prefix, 0);
|
||||
containerDbServiceProvider.storeContainerKeyMapping(
|
||||
containerKeyPrefix, prefixCounts.get(prefix));
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
MetaStoreIterator<MetadataStore.KeyValue> iterator =
|
||||
containerDBStore.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
iterator.next();
|
||||
count++;
|
||||
}
|
||||
assertTrue(count == 3);
|
||||
Assert.assertTrue(
|
||||
containerDbServiceProvider.getCountForForContainerKeyPrefix(
|
||||
new ContainerKeyPrefix(containerId, "V1/B1/K1",
|
||||
0)) == 1);
|
||||
Assert.assertTrue(
|
||||
containerDbServiceProvider.getCountForForContainerKeyPrefix(
|
||||
new ContainerKeyPrefix(containerId, "V1/B1/K2",
|
||||
0)) == 2);
|
||||
Assert.assertTrue(
|
||||
containerDbServiceProvider.getCountForForContainerKeyPrefix(
|
||||
new ContainerKeyPrefix(containerId, "V1/B2/K3",
|
||||
0)) == 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -109,11 +163,11 @@ public void testGetCountForForContainerKeyPrefix() throws Exception {
|
||||
long containerId = System.currentTimeMillis();
|
||||
|
||||
containerDbServiceProvider.storeContainerKeyMapping(new
|
||||
ContainerKeyPrefix(containerId, "V1/B1/K1"), 2);
|
||||
ContainerKeyPrefix(containerId, "V2/B1/K1"), 2);
|
||||
|
||||
Integer count = containerDbServiceProvider.
|
||||
getCountForForContainerKeyPrefix(new ContainerKeyPrefix(containerId,
|
||||
"V1/B1/K1"));
|
||||
"V2/B1/K1"));
|
||||
assertTrue(count == 2);
|
||||
}
|
||||
|
||||
@ -121,25 +175,32 @@ public void testGetCountForForContainerKeyPrefix() throws Exception {
|
||||
public void testGetKeyPrefixesForContainer() throws Exception {
|
||||
long containerId = System.currentTimeMillis();
|
||||
|
||||
containerDbServiceProvider.storeContainerKeyMapping(new
|
||||
ContainerKeyPrefix(containerId, "V1/B1/K1"), 1);
|
||||
ContainerKeyPrefix containerKeyPrefix1 = new
|
||||
ContainerKeyPrefix(containerId, "V3/B1/K1", 0);
|
||||
containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix1,
|
||||
1);
|
||||
|
||||
containerDbServiceProvider.storeContainerKeyMapping(new
|
||||
ContainerKeyPrefix(containerId, "V1/B1/K2"), 2);
|
||||
ContainerKeyPrefix containerKeyPrefix2 = new ContainerKeyPrefix(
|
||||
containerId, "V3/B1/K2", 0);
|
||||
containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix2,
|
||||
2);
|
||||
|
||||
long nextContainerId = System.currentTimeMillis();
|
||||
containerDbServiceProvider.storeContainerKeyMapping(new
|
||||
ContainerKeyPrefix(nextContainerId, "V1/B2/K1"), 3);
|
||||
long nextContainerId = containerId + 1000L;
|
||||
ContainerKeyPrefix containerKeyPrefix3 = new ContainerKeyPrefix(
|
||||
nextContainerId, "V3/B2/K1", 0);
|
||||
containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix3,
|
||||
3);
|
||||
|
||||
Map<String, Integer> keyPrefixMap = containerDbServiceProvider
|
||||
.getKeyPrefixesForContainer(containerId);
|
||||
Map<ContainerKeyPrefix, Integer> keyPrefixMap =
|
||||
containerDbServiceProvider.getKeyPrefixesForContainer(containerId);
|
||||
assertTrue(keyPrefixMap.size() == 2);
|
||||
assertTrue(keyPrefixMap.get("V1/B1/K1") == 1);
|
||||
assertTrue(keyPrefixMap.get("V1/B1/K2") == 2);
|
||||
|
||||
keyPrefixMap = containerDbServiceProvider
|
||||
.getKeyPrefixesForContainer(nextContainerId);
|
||||
assertTrue(keyPrefixMap.get(containerKeyPrefix1) == 1);
|
||||
assertTrue(keyPrefixMap.get(containerKeyPrefix2) == 2);
|
||||
|
||||
keyPrefixMap = containerDbServiceProvider.getKeyPrefixesForContainer(
|
||||
nextContainerId);
|
||||
assertTrue(keyPrefixMap.size() == 1);
|
||||
assertTrue(keyPrefixMap.get("V1/B2/K1") == 3);
|
||||
assertTrue(keyPrefixMap.get(containerKeyPrefix3) == 3);
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@ -32,21 +29,13 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.BucketManager;
|
||||
import org.apache.hadoop.ozone.om.BucketManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
|
||||
import org.apache.hadoop.ozone.recon.ReconUtils;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
@ -71,7 +60,8 @@
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
|
||||
@PrepareForTest(ReconUtils.class)
|
||||
public class TestOzoneManagerServiceProviderImpl {
|
||||
public class TestOzoneManagerServiceProviderImpl extends
|
||||
AbstractOMMetadataManagerTest {
|
||||
|
||||
private OMMetadataManager omMetadataManager;
|
||||
private ReconOMMetadataManager reconOMMetadataManager;
|
||||
@ -83,15 +73,16 @@ public class TestOzoneManagerServiceProviderImpl {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initializeNewOmMetadataManager();
|
||||
omMetadataManager = initializeNewOmMetadataManager();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
try {
|
||||
initializeNewOmMetadataManager();
|
||||
writeDataToOm(omMetadataManager, "key_one");
|
||||
bind(OzoneConfiguration.class).toInstance(
|
||||
getTestOzoneConfiguration());
|
||||
reconOMMetadataManager = getTestMetadataManager();
|
||||
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
|
||||
bind(ReconOMMetadataManager.class).toInstance(reconOMMetadataManager);
|
||||
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
|
||||
getTestOzoneConfiguration());
|
||||
@ -102,18 +93,17 @@ protected void configure() {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStart() throws Exception {
|
||||
@Test
|
||||
public void testInit() throws Exception {
|
||||
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
Assert.assertNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_two"));
|
||||
|
||||
writeDataToOm();
|
||||
writeDataToOm(omMetadataManager, "key_two");
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
|
||||
@ -123,8 +113,7 @@ public void testStart() throws Exception {
|
||||
CloseableHttpClient.class, String.class))
|
||||
.toReturn(inputStream);
|
||||
|
||||
ozoneManagerServiceProvider.start();
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
ozoneManagerServiceProvider.init();
|
||||
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
@ -187,89 +176,9 @@ public void testGetOzoneManagerDBSnapshot() throws Exception {
|
||||
*/
|
||||
private OzoneConfiguration getTestOzoneConfiguration() throws IOException {
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
|
||||
"0m");
|
||||
configuration.set(RECON_OM_SNAPSHOT_TASK_INTERVAL, "1m");
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
|
||||
temporaryFolder.newFolder().getAbsolutePath());
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new OM Metadata manager instance.
|
||||
* @throws IOException ioEx
|
||||
*/
|
||||
private void initializeNewOmMetadataManager() throws IOException {
|
||||
File omDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration omConfiguration = new OzoneConfiguration();
|
||||
omConfiguration.set(OZONE_OM_DB_DIRS,
|
||||
omDbDir.getAbsolutePath());
|
||||
omMetadataManager = new OmMetadataManagerImpl(omConfiguration);
|
||||
|
||||
String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
|
||||
OmVolumeArgs args =
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setVolume("sampleVol")
|
||||
.setAdminName("TestUser")
|
||||
.setOwnerName("TestUser")
|
||||
.build();
|
||||
omMetadataManager.getVolumeTable().put(volumeKey, args);
|
||||
|
||||
BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
|
||||
OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
|
||||
.setVolumeName("sampleVol")
|
||||
.setBucketName("bucketOne")
|
||||
.build();
|
||||
bucketManager.createBucket(bucketInfo);
|
||||
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_one",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_one")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an instance of Recon OM Metadata manager.
|
||||
* @return ReconOMMetadataManager
|
||||
* @throws IOException when creating the RocksDB instance.
|
||||
*/
|
||||
private ReconOMMetadataManager getTestMetadataManager() throws IOException {
|
||||
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
assertNotNull(checkpoint.getCheckpointLocation());
|
||||
|
||||
File reconOmDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
|
||||
.getAbsolutePath());
|
||||
|
||||
ReconOMMetadataManager reconOMMetaMgr =
|
||||
new ReconOmMetadataManagerImpl(configuration);
|
||||
reconOMMetaMgr.start(configuration);
|
||||
|
||||
reconOMMetaMgr.updateOmDB(
|
||||
checkpoint.getCheckpointLocation().toFile());
|
||||
return reconOMMetaMgr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a key to OM instance.
|
||||
* @throws IOException while writing.
|
||||
*/
|
||||
private void writeDataToOm() throws IOException {
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_two",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_two")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.ProvisionException;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
/**
|
||||
* Tests the class that provides the instance of the DB Store used by Recon to
|
||||
* store its container - key data.
|
||||
*/
|
||||
public class TestReconContainerDBProvider {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private Injector injector;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
tempFolder.create();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
File dbDir = tempFolder.getRoot();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_DB_DIR, dbDir.getAbsolutePath());
|
||||
bind(OzoneConfiguration.class).toInstance(configuration);
|
||||
bind(DBStore.class).toProvider(ReconContainerDBProvider.class).in(
|
||||
Singleton.class);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws Exception {
|
||||
|
||||
ReconContainerDBProvider reconContainerDBProvider = injector.getInstance(
|
||||
ReconContainerDBProvider.class);
|
||||
DBStore dbStore = reconContainerDBProvider.get();
|
||||
assertNotNull(dbStore);
|
||||
|
||||
ReconContainerDBProvider reconContainerDBProviderNew = new
|
||||
ReconContainerDBProvider();
|
||||
try {
|
||||
reconContainerDBProviderNew.get();
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
assertTrue(e instanceof ProvisionException);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,194 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.tasks;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
|
||||
import org.apache.hadoop.ozone.recon.ReconUtils;
|
||||
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.mockito.PowerMockito;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
/**
|
||||
* Unit test for Container Key mapper task.
|
||||
*/
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
|
||||
@PrepareForTest(ReconUtils.class)
|
||||
public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
|
||||
|
||||
private ContainerDBServiceProvider containerDbServiceProvider;
|
||||
private OMMetadataManager omMetadataManager;
|
||||
private ReconOMMetadataManager reconOMMetadataManager;
|
||||
private Injector injector;
|
||||
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
omMetadataManager = initializeNewOmMetadataManager();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
try {
|
||||
bind(OzoneConfiguration.class).toInstance(
|
||||
getTestOzoneConfiguration());
|
||||
|
||||
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
|
||||
bind(ReconOMMetadataManager.class).toInstance(reconOMMetadataManager);
|
||||
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
|
||||
getTestOzoneConfiguration());
|
||||
bind(OzoneManagerServiceProvider.class)
|
||||
.toInstance(ozoneManagerServiceProvider);
|
||||
|
||||
bind(DBStore.class).toProvider(ReconContainerDBProvider.class).
|
||||
in(Singleton.class);
|
||||
bind(ContainerDBServiceProvider.class).to(
|
||||
ContainerDBServiceProviderImpl.class).in(Singleton.class);
|
||||
} catch (IOException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
});
|
||||
containerDbServiceProvider = injector.getInstance(
|
||||
ContainerDBServiceProvider.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRun() throws Exception{
|
||||
|
||||
Map<ContainerKeyPrefix, Integer> keyPrefixesForContainer =
|
||||
containerDbServiceProvider.getKeyPrefixesForContainer(1);
|
||||
assertTrue(keyPrefixesForContainer.isEmpty());
|
||||
|
||||
keyPrefixesForContainer = containerDbServiceProvider
|
||||
.getKeyPrefixesForContainer(2);
|
||||
assertTrue(keyPrefixesForContainer.isEmpty());
|
||||
|
||||
Pipeline pipeline = getRandomPipeline();
|
||||
|
||||
List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
|
||||
BlockID blockID1 = new BlockID(1, 1);
|
||||
OmKeyLocationInfo omKeyLocationInfo1 = getOmKeyLocationInfo(blockID1,
|
||||
pipeline);
|
||||
|
||||
BlockID blockID2 = new BlockID(2, 1);
|
||||
OmKeyLocationInfo omKeyLocationInfo2
|
||||
= getOmKeyLocationInfo(blockID2, pipeline);
|
||||
|
||||
omKeyLocationInfoList.add(omKeyLocationInfo1);
|
||||
omKeyLocationInfoList.add(omKeyLocationInfo2);
|
||||
|
||||
OmKeyLocationInfoGroup omKeyLocationInfoGroup = new
|
||||
OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
|
||||
|
||||
writeDataToOm(omMetadataManager,
|
||||
"key_one",
|
||||
"bucketOne",
|
||||
"sampleVol",
|
||||
Collections.singletonList(omKeyLocationInfoGroup));
|
||||
|
||||
//Take snapshot of OM DB and copy over to Recon OM DB.
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
|
||||
InputStream inputStream = new FileInputStream(tarFile);
|
||||
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
|
||||
"makeHttpCall",
|
||||
CloseableHttpClient.class, String.class))
|
||||
.toReturn(inputStream);
|
||||
|
||||
ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
|
||||
ozoneManagerServiceProvider, containerDbServiceProvider);
|
||||
containerKeyMapperTask.run();
|
||||
|
||||
keyPrefixesForContainer =
|
||||
containerDbServiceProvider.getKeyPrefixesForContainer(1);
|
||||
assertTrue(keyPrefixesForContainer.size() == 1);
|
||||
String omKey = omMetadataManager.getOzoneKey("sampleVol",
|
||||
"bucketOne", "key_one");
|
||||
ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix(1,
|
||||
omKey, 0);
|
||||
assertEquals(1,
|
||||
keyPrefixesForContainer.get(containerKeyPrefix).intValue());
|
||||
|
||||
keyPrefixesForContainer =
|
||||
containerDbServiceProvider.getKeyPrefixesForContainer(2);
|
||||
assertTrue(keyPrefixesForContainer.size() == 1);
|
||||
containerKeyPrefix = new ContainerKeyPrefix(2, omKey,
|
||||
0);
|
||||
assertEquals(1,
|
||||
keyPrefixesForContainer.get(containerKeyPrefix).intValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Test OzoneConfiguration instance.
|
||||
* @return OzoneConfiguration
|
||||
* @throws IOException ioEx.
|
||||
*/
|
||||
private OzoneConfiguration getTestOzoneConfiguration()
|
||||
throws IOException {
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
|
||||
temporaryFolder.newFolder().getAbsolutePath());
|
||||
configuration.set(OZONE_RECON_DB_DIR, temporaryFolder.newFolder()
|
||||
.getAbsolutePath());
|
||||
return configuration;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* The classes in this package tests the various scheduled tasks used by
|
||||
* Recon.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.recon.tasks;
|
Loading…
Reference in New Issue
Block a user