diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 9e0c4a4b42..d01dfe44d3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.classification.InterfaceStability; @@ -158,4 +159,17 @@ void move(KEY sourceKey, KEY destKey, VALUE value, * @return DB file location. */ File getDbLocation(); + + /** + * Get List of Index to Table Names. + * (For decoding table from column family index) + * @return Map of Index -> TableName + */ + Map getTableNames(); + + /** + * Get Codec registry. + * @return codec registry. + */ + CodecRegistry getCodecRegistry(); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java index 34bdc5dbc3..3459b2032e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java @@ -57,7 +57,6 @@ public final class DBStoreBuilder { private List tableNames; private Configuration configuration; private CodecRegistry registry; - private boolean readOnly = false; private DBStoreBuilder(Configuration configuration) { tables = new HashSet<>(); @@ -114,11 +113,6 @@ public DBStoreBuilder setPath(Path path) { return this; } - public DBStoreBuilder setReadOnly(boolean rdOnly) { - readOnly = rdOnly; - return this; - } - /** * Builds a DBStore instance and returns that. * @@ -137,7 +131,7 @@ public DBStore build() throws IOException { if (!dbFile.getParentFile().exists()) { throw new IOException("The DB destination directory should exist."); } - return new RDBStore(dbFile, options, tables, registry, readOnly); + return new RDBStore(dbFile, options, tables, registry); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 07d74c4f46..d293c1d215 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -64,15 +64,16 @@ public class RDBStore implements DBStore { private ObjectName statMBeanName; private RDBCheckpointManager checkPointManager; private String checkpointsParentDir; + private List columnFamilyHandles; @VisibleForTesting public RDBStore(File dbFile, DBOptions options, Set families) throws IOException { - this(dbFile, options, families, new CodecRegistry(), false); + this(dbFile, options, families, new CodecRegistry()); } public RDBStore(File dbFile, DBOptions options, Set families, - CodecRegistry registry, boolean readOnly) + CodecRegistry registry) throws IOException { Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); Preconditions.checkNotNull(families); @@ -81,7 +82,7 @@ public RDBStore(File dbFile, DBOptions options, Set families, codecRegistry = registry; final List columnFamilyDescriptors = new ArrayList<>(); - final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles = new ArrayList<>(); for (TableConfig family : families) { columnFamilyDescriptors.add(family.getDescriptor()); @@ -93,13 +94,8 @@ public RDBStore(File dbFile, DBOptions options, Set families, writeOptions = new WriteOptions(); try { - if (readOnly) { - db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(), - columnFamilyDescriptors, columnFamilyHandles); - } else { - db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(), - columnFamilyDescriptors, columnFamilyHandles); - } + db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); for (int x = 0; x < columnFamilyHandles.size(); x++) { handleTable.put( @@ -299,7 +295,31 @@ public File getDbLocation() { return dbLocation; } + @Override + public Map getTableNames() { + Map tableNames = new HashMap<>(); + StringCodec stringCodec = new StringCodec(); + + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + try { + tableNames.put(columnFamilyHandle.getID(), stringCodec + .fromPersistedFormat(columnFamilyHandle.getName())); + } catch (RocksDBException | IOException e) { + LOG.error("Unexpected exception while reading column family handle " + + "name", e); + } + } + return tableNames; + } + + @Override public CodecRegistry getCodecRegistry() { return codecRegistry; } + + @VisibleForTesting + public RocksDB getDb() { + return db; + } + } \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 305cac5a2c..9b941a00cc 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2472,4 +2472,12 @@ connections. + + ozone.recon.task.thread.count + 1 + OZONE, RECON + + The number of Recon Tasks that are waiting on updates from OM. + + diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java index 24a9ee50bf..6d51034252 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java @@ -21,7 +21,6 @@ import javax.management.MBeanServer; -import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.charset.StandardCharsets; @@ -290,46 +289,4 @@ public void testRocksDBCheckpointCleanup() throws Exception { checkpoint.getCheckpointLocation())); } } - - @Test - public void testReadOnlyRocksDB() throws Exception { - File dbFile = folder.newFolder(); - byte[] key = "Key1".getBytes(); - byte[] value = "Value1".getBytes(); - - //Create Rdb and write some data into it. - RDBStore newStore = new RDBStore(dbFile, options, configSet); - Assert.assertNotNull("DB Store cannot be null", newStore); - Table firstTable = newStore.getTable(families.get(0)); - Assert.assertNotNull("Table cannot be null", firstTable); - firstTable.put(key, value); - - RocksDBCheckpoint checkpoint = (RocksDBCheckpoint) newStore.getCheckpoint( - true); - - //Create Read Only DB from snapshot of first DB. - RDBStore snapshotStore = new RDBStore(checkpoint.getCheckpointLocation() - .toFile(), options, configSet, new CodecRegistry(), true); - - Assert.assertNotNull("DB Store cannot be null", newStore); - - //Verify read is allowed. - firstTable = snapshotStore.getTable(families.get(0)); - Assert.assertNotNull("Table cannot be null", firstTable); - Assert.assertTrue(Arrays.equals(((byte[])firstTable.get(key)), value)); - - //Verify write is not allowed. - byte[] key2 = - RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); - byte[] value2 = - RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); - try { - firstTable.put(key2, value2); - Assert.fail(); - } catch (IOException e) { - Assert.assertTrue(e.getMessage() - .contains("Not supported operation in read only mode")); - } - checkpoint.cleanupCheckpoint(); - } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index ece04ddf61..0720a10b24 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -100,16 +100,16 @@ public class OmMetadataManagerImpl implements OMMetadataManager { * |-------------------------------------------------------------------| */ - private static final String USER_TABLE = "userTable"; - private static final String VOLUME_TABLE = "volumeTable"; - private static final String BUCKET_TABLE = "bucketTable"; - private static final String KEY_TABLE = "keyTable"; - private static final String DELETED_TABLE = "deletedTable"; - private static final String OPEN_KEY_TABLE = "openKeyTable"; - private static final String S3_TABLE = "s3Table"; - private static final String MULTIPARTINFO_TABLE = "multipartInfoTable"; - private static final String S3_SECRET_TABLE = "s3SecretTable"; - private static final String DELEGATION_TOKEN_TABLE = "dTokenTable"; + public static final String USER_TABLE = "userTable"; + public static final String VOLUME_TABLE = "volumeTable"; + public static final String BUCKET_TABLE = "bucketTable"; + public static final String KEY_TABLE = "keyTable"; + public static final String DELETED_TABLE = "deletedTable"; + public static final String OPEN_KEY_TABLE = "openKeyTable"; + public static final String S3_TABLE = "s3Table"; + public static final String MULTIPARTINFO_TABLE = "multipartInfoTable"; + public static final String S3_SECRET_TABLE = "s3SecretTable"; + public static final String DELEGATION_TOKEN_TABLE = "dTokenTable"; private DBStore store; diff --git a/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java b/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java index fa44e46057..5e06b99324 100644 --- a/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java +++ b/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/codegen/ReconSchemaGenerationModule.java @@ -17,6 +17,7 @@ */ package org.hadoop.ozone.recon.codegen; +import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition; import org.hadoop.ozone.recon.schema.ReconSchemaDefinition; import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; @@ -34,6 +35,6 @@ protected void configure() { Multibinder schemaBinder = Multibinder.newSetBinder(binder(), ReconSchemaDefinition.class); schemaBinder.addBinding().to(UtilizationSchemaDefinition.class); - + schemaBinder.addBinding().to(ReconInternalSchemaDefinition.class); } } diff --git a/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ReconInternalSchemaDefinition.java b/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ReconInternalSchemaDefinition.java new file mode 100644 index 0000000000..9ab9e38e95 --- /dev/null +++ b/hadoop-ozone/ozone-recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ReconInternalSchemaDefinition.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.hadoop.ozone.recon.schema; + +import java.sql.Connection; +import java.sql.SQLException; + +import javax.sql.DataSource; + +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; + +import com.google.inject.Inject; + +/** + * Class used to create tables that are required for Recon's internal + * management. + */ +public class ReconInternalSchemaDefinition implements ReconSchemaDefinition { + + public static final String RECON_TASK_STATUS_TABLE_NAME = + "recon_task_status"; + private final DataSource dataSource; + + @Inject + ReconInternalSchemaDefinition(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public void initializeSchema() throws SQLException { + Connection conn = dataSource.getConnection(); + createReconTaskStatus(conn); + } + + /** + * Create the Recon Task Status table. + * @param conn connection + */ + private void createReconTaskStatus(Connection conn) { + DSL.using(conn).createTableIfNotExists(RECON_TASK_STATUS_TABLE_NAME) + .column("task_name", SQLDataType.VARCHAR(1024)) + .column("last_updated_timestamp", SQLDataType.BIGINT) + .column("last_updated_seq_number", SQLDataType.BIGINT) + .constraint(DSL.constraint("pk_task_name") + .primaryKey("task_name")) + .execute(); + } +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 0576c6b0f3..3473a6281e 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskController; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl; import org.apache.hadoop.utils.db.DBStore; import com.google.inject.AbstractModule; @@ -65,6 +67,9 @@ protected void configure() { // Persistence - inject configuration provider install(new JooqPersistenceModule( getProvider(DataSourceConfiguration.class))); + + bind(ReconTaskController.class) + .to(ReconTaskControllerImpl.class).in(Singleton.class); } @Provides diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java index cea168121b..fb2dcc38dc 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT; +import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -67,7 +68,7 @@ public Void call() throws Exception { @Override protected void configureServlets() { rest("/api/*") - .packages("org.apache.hadoop.ozone.recon.api"); + .packages("org.apache.hadoop.ozone.recon.api"); } }); @@ -100,10 +101,6 @@ private void scheduleReconTasks() { OzoneManagerServiceProvider ozoneManagerServiceProvider = injector .getInstance(OzoneManagerServiceProvider.class); - // Schedule the task to read OM DB and write the reverse mapping to Recon - // container DB. - ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask( - ozoneManagerServiceProvider, containerDBServiceProvider); long initialDelay = configuration.getTimeDuration( RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY, RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT, @@ -113,8 +110,22 @@ private void scheduleReconTasks() { RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - scheduler.scheduleWithFixedDelay(containerKeyMapperTask, initialDelay, - interval, TimeUnit.MILLISECONDS); + + scheduler.scheduleWithFixedDelay(() -> { + try { + ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); + // Schedule the task to read OM DB and write the reverse mapping to + // Recon container DB. + ContainerKeyMapperTask containerKeyMapperTask = + new ContainerKeyMapperTask(containerDBServiceProvider, + ozoneManagerServiceProvider.getOMMetadataManagerInstance()); + containerKeyMapperTask.reprocess( + ozoneManagerServiceProvider.getOMMetadataManagerInstance()); + } catch (IOException e) { + LOG.error("Unable to get OM " + + "Snapshot", e); + } + }, initialDelay, interval, TimeUnit.MILLISECONDS); } void stop() throws Exception { diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index c779e113b6..0501093ae0 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -112,6 +112,10 @@ public final class ReconServerConfigKeys { public static final String OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT = "ozone.recon.sql.db.conn.idle.test"; + public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = + "ozone.recon.task.thread.count"; + public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java index 3b0fb49f7c..409a7e9cf9 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java @@ -64,7 +64,6 @@ private void initializeNewRdbStore(File dbFile) throws IOException { try { DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(ozoneConfiguration) - .setReadOnly(true) .setName(dbFile.getName()) .setPath(dbFile.toPath().getParent()); addOMTablesAndCodecs(dbStoreBuilder); diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java index a1044ec23b..0449e7cf77 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ContainerDBServiceProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; +import org.apache.hadoop.utils.db.TableIterator; /** * The Recon Container DB Service interface. @@ -75,4 +76,19 @@ Map getKeyPrefixesForContainer(long containerId) * @throws IOException */ Map getContainers() throws IOException; + + /** + * Delete an entry in the container DB. + * @param containerKeyPrefix container key prefix to be deleted. + * @throws IOException exception. + */ + void deleteContainerMapping(ContainerKeyPrefix containerKeyPrefix) + throws IOException; + + /** + * Get iterator to the entire container DB. + * @return TableIterator + * @throws IOException exception + */ + TableIterator getContainerTableIterator() throws IOException; } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java index 3a20e82c9d..e79b8044f0 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java @@ -191,4 +191,15 @@ public Map getContainers() throws IOException { } return containers; } + + @Override + public void deleteContainerMapping(ContainerKeyPrefix containerKeyPrefix) + throws IOException { + containerKeyTable.delete(containerKeyPrefix); + } + + @Override + public TableIterator getContainerTableIterator() throws IOException { + return containerKeyTable.iterator(); + } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java index 9ec1a79448..47dfff0f2c 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java @@ -21,14 +21,20 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider; -import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; import org.apache.hadoop.utils.db.Table; import org.apache.hadoop.utils.db.TableIterator; import org.slf4j.Logger; @@ -38,19 +44,24 @@ * Class to iterate over the OM DB and populate the Recon container DB with * the container -> Key reverse mapping. */ -public class ContainerKeyMapperTask implements Runnable { +public class ContainerKeyMapperTask extends ReconDBUpdateTask { private static final Logger LOG = LoggerFactory.getLogger(ContainerKeyMapperTask.class); - private OzoneManagerServiceProvider ozoneManagerServiceProvider; private ContainerDBServiceProvider containerDBServiceProvider; + private Collection tables = new ArrayList<>(); - public ContainerKeyMapperTask( - OzoneManagerServiceProvider ozoneManagerServiceProvider, - ContainerDBServiceProvider containerDBServiceProvider) { - this.ozoneManagerServiceProvider = ozoneManagerServiceProvider; + public ContainerKeyMapperTask(ContainerDBServiceProvider + containerDBServiceProvider, + OMMetadataManager omMetadataManager) { + super("ContainerKeyMapperTask"); this.containerDBServiceProvider = containerDBServiceProvider; + try { + tables.add(omMetadataManager.getKeyTable().getName()); + } catch (IOException ioEx) { + LOG.error("Unable to listen on Key Table updates ", ioEx); + } } /** @@ -58,55 +69,122 @@ public ContainerKeyMapperTask( * (container, key) -> count to Recon Container DB. */ @Override - public void run() { + public Pair reprocess(OMMetadataManager omMetadataManager) { int omKeyCount = 0; - int containerCount = 0; try { - LOG.info("Starting a run of ContainerKeyMapperTask."); + LOG.info("Starting a 'reprocess' run of ContainerKeyMapperTask."); Instant start = Instant.now(); - //Update OM DB Snapshot. - ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); - - OMMetadataManager omMetadataManager = ozoneManagerServiceProvider - .getOMMetadataManagerInstance(); Table omKeyInfoTable = omMetadataManager.getKeyTable(); try (TableIterator> keyIter = omKeyInfoTable.iterator()) { while (keyIter.hasNext()) { Table.KeyValue kv = keyIter.next(); - StringBuilder key = new StringBuilder(kv.getKey()); OmKeyInfo omKeyInfo = kv.getValue(); - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo - .getKeyLocationVersions()) { - long keyVersion = omKeyLocationInfoGroup.getVersion(); - for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup - .getLocationList()) { - long containerId = omKeyLocationInfo.getContainerID(); - ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix( - containerId, key.toString(), keyVersion); - if (containerDBServiceProvider.getCountForForContainerKeyPrefix( - containerKeyPrefix) == 0) { - // Save on writes. No need to save same container-key prefix - // mapping again. - containerDBServiceProvider.storeContainerKeyMapping( - containerKeyPrefix, 1); - } - containerCount++; - } - } + writeOMKeyToContainerDB(kv.getKey(), omKeyInfo); omKeyCount++; } } - LOG.info("Completed the run of ContainerKeyMapperTask."); + LOG.info("Completed 'reprocess' of ContainerKeyMapperTask."); Instant end = Instant.now(); long duration = Duration.between(start, end).toMillis(); - LOG.info("It took me " + (double)duration / 1000.0 + " seconds to " + - "process " + omKeyCount + " keys and " + containerCount + " " + - "containers."); + LOG.info("It took me " + (double) duration / 1000.0 + " seconds to " + + "process " + omKeyCount + " keys."); } catch (IOException ioEx) { LOG.error("Unable to populate Container Key Prefix data in Recon DB. ", ioEx); + return new ImmutablePair<>(getTaskName(), false); + } + return new ImmutablePair<>(getTaskName(), true); + } + + + @Override + protected Collection getTaskTables() { + return tables; + } + + @Override + Pair process(OMUpdateEventBatch events) { + Iterator eventIterator = events.getIterator(); + while (eventIterator.hasNext()) { + OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); + String updatedKey = omdbUpdateEvent.getKey(); + OmKeyInfo updatedKeyValue = omdbUpdateEvent.getValue(); + try { + switch (omdbUpdateEvent.getAction()) { + case PUT: + writeOMKeyToContainerDB(updatedKey, updatedKeyValue); + break; + + case DELETE: + deleteOMKeyFromContainerDB(updatedKey); + break; + + default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent + .getAction()); + } + } catch (IOException e) { + LOG.error("Unexpected exception while updating key data : {} ", e); + return new ImmutablePair<>(getTaskName(), false); + } + } + return new ImmutablePair<>(getTaskName(), true); + } + + /** + * Delete an OM Key from Container DB. + * @param key key String. + * @throws IOException If Unable to write to container DB. + */ + private void deleteOMKeyFromContainerDB(String key) + throws IOException { + + TableIterator> containerIterator = + containerDBServiceProvider.getContainerTableIterator(); + + Set keysToDeDeleted = new HashSet<>(); + + while (containerIterator.hasNext()) { + Table.KeyValue keyValue = + containerIterator.next(); + String keyPrefix = keyValue.getKey().getKeyPrefix(); + if (keyPrefix.equals(key)) { + keysToDeDeleted.add(keyValue.getKey()); + } + } + + for (ContainerKeyPrefix containerKeyPrefix : keysToDeDeleted) { + containerDBServiceProvider.deleteContainerMapping(containerKeyPrefix); } } + + /** + * Write an OM key to container DB. + * @param key key String + * @param omKeyInfo omKeyInfo value + * @throws IOException if unable to write to recon DB. + */ + private void writeOMKeyToContainerDB(String key, OmKeyInfo omKeyInfo) + throws IOException { + for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo + .getKeyLocationVersions()) { + long keyVersion = omKeyLocationInfoGroup.getVersion(); + for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup + .getLocationList()) { + long containerId = omKeyLocationInfo.getContainerID(); + ContainerKeyPrefix containerKeyPrefix = new ContainerKeyPrefix( + containerId, key, keyVersion); + if (containerDBServiceProvider.getCountForForContainerKeyPrefix( + containerKeyPrefix) == 0) { + // Save on writes. No need to save same container-key prefix + // mapping again. + containerDBServiceProvider.storeContainerKeyMapping( + containerKeyPrefix, 1); + } + } + } + } + } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java new file mode 100644 index 0000000000..82b7a35912 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +/** + * A class used to encapsulate a single OM DB update event. + * Currently only PUT and DELETE are supported. + * @param Type of Key. + * @param Type of Value. + */ +public final class OMDBUpdateEvent { + + private final OMDBUpdateAction action; + private final String table; + private final KEY updatedKey; + private final VALUE updatedValue; + private final EventInfo eventInfo; + + private OMDBUpdateEvent(OMDBUpdateAction action, + String table, + KEY updatedKey, + VALUE updatedValue, + EventInfo eventInfo) { + this.action = action; + this.table = table; + this.updatedKey = updatedKey; + this.updatedValue = updatedValue; + this.eventInfo = eventInfo; + } + + public OMDBUpdateAction getAction() { + return action; + } + + public String getTable() { + return table; + } + + public KEY getKey() { + return updatedKey; + } + + public VALUE getValue() { + return updatedValue; + } + + public EventInfo getEventInfo() { + return eventInfo; + } + + /** + * Builder used to construct an OM DB Update event. + * @param Key type. + * @param Value type. + */ + public static class OMUpdateEventBuilder { + + private OMDBUpdateAction action; + private String table; + private KEY updatedKey; + private VALUE updatedValue; + private EventInfo eventInfo; + + OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) { + this.action = omdbUpdateAction; + return this; + } + + OMUpdateEventBuilder setTable(String tableName) { + this.table = tableName; + return this; + } + + OMUpdateEventBuilder setKey(KEY key) { + this.updatedKey = key; + return this; + } + + OMUpdateEventBuilder setValue(VALUE value) { + this.updatedValue = value; + return this; + } + + OMUpdateEventBuilder setEventInfo(long sequenceNumber, + long eventTimestampMillis) { + this.eventInfo = new EventInfo(sequenceNumber, + eventTimestampMillis); + return this; + } + + /** + * Build an OM update event. + * @return OMDBUpdateEvent + */ + public OMDBUpdateEvent build() { + return new OMDBUpdateEvent( + action, + table, + updatedKey, + updatedValue, + eventInfo); + } + } + + /** + * Class used to hold timing information for an event. (Seq number and + * timestamp) + */ + public static class EventInfo { + private long sequenceNumber; + private long eventTimestampMillis; + + public EventInfo(long sequenceNumber, + long eventTimestampMillis) { + this.sequenceNumber = sequenceNumber; + this.eventTimestampMillis = eventTimestampMillis; + } + + public long getSequenceNumber() { + return sequenceNumber; + } + + public long getEventTimestampMillis() { + return eventTimestampMillis; + } + } + + /** + * Supported Actions - PUT, DELETE. + */ + public enum OMDBUpdateAction { + PUT, DELETE + } +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java new file mode 100644 index 0000000000..d2d11b2865 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.utils.db.CodecRegistry; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class used to listen on OM RocksDB updates. + */ +public class OMDBUpdatesHandler extends WriteBatch.Handler { + + private static final Logger LOG = + LoggerFactory.getLogger(OMDBUpdatesHandler.class); + + private Map tablesNames; + private CodecRegistry codecRegistry; + private List omdbUpdateEvents = new ArrayList<>(); + + public OMDBUpdatesHandler(OMMetadataManager omMetadataManager) { + tablesNames = omMetadataManager.getStore().getTableNames(); + codecRegistry = omMetadataManager.getStore().getCodecRegistry(); + } + + @Override + public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) throws + RocksDBException { + try { + processEvent(cfIndex, keyBytes, valueBytes, + OMDBUpdateEvent.OMDBUpdateAction.PUT); + } catch (IOException ioEx) { + LOG.error("Exception when reading key : " + ioEx); + } + } + + @Override + public void delete(int cfIndex, byte[] keyBytes) throws RocksDBException { + try { + processEvent(cfIndex, keyBytes, null, + OMDBUpdateEvent.OMDBUpdateAction.DELETE); + } catch (IOException ioEx) { + LOG.error("Exception when reading key : " + ioEx); + } + } + + /** + * + */ + private void processEvent(int cfIndex, byte[] keyBytes, byte[] + valueBytes, OMDBUpdateEvent.OMDBUpdateAction action) + throws IOException { + String tableName = tablesNames.get(cfIndex); + Class keyType = getKeyType(tableName); + Class valueType = getValueType(tableName); + if (valueType != null) { + OMDBUpdateEvent.OMUpdateEventBuilder builder = + new OMDBUpdateEvent.OMUpdateEventBuilder<>(); + builder.setTable(tableName); + + Object key = codecRegistry.asObject(keyBytes, keyType); + builder.setKey(key); + + if (!action.equals(OMDBUpdateEvent.OMDBUpdateAction.DELETE)) { + Object value = codecRegistry.asObject(valueBytes, valueType); + builder.setValue(value); + } + + builder.setAction(action); + OMDBUpdateEvent event = builder.build(); + LOG.info("Generated OM update Event for table : " + event.getTable() + + ", Key = " + event.getKey()); + // Temporarily adding to an event buffer for testing. In subsequent JIRAs, + // a Recon side class will be implemented that requests delta updates + // from OM and calls on this handler. In that case, we will fill up + // this buffer and pass it on to the ReconTaskController which has + // tasks waiting on OM events. + omdbUpdateEvents.add(event); + } + } + + // There are no use cases yet for the remaining methods in Recon. These + // will be implemented as and when need arises. + + @Override + public void put(byte[] bytes, byte[] bytes1) { + + } + + @Override + public void merge(int i, byte[] bytes, byte[] bytes1) + throws RocksDBException { + } + + @Override + public void merge(byte[] bytes, byte[] bytes1) { + } + + @Override + public void delete(byte[] bytes) { + } + + @Override + public void singleDelete(int i, byte[] bytes) throws RocksDBException { + } + + @Override + public void singleDelete(byte[] bytes) { + } + + @Override + public void deleteRange(int i, byte[] bytes, byte[] bytes1) + throws RocksDBException { + } + + @Override + public void deleteRange(byte[] bytes, byte[] bytes1) { + + } + + @Override + public void logData(byte[] bytes) { + + } + + @Override + public void putBlobIndex(int i, byte[] bytes, byte[] bytes1) + throws RocksDBException { + } + + @Override + public void markBeginPrepare() throws RocksDBException { + + } + + @Override + public void markEndPrepare(byte[] bytes) throws RocksDBException { + + } + + @Override + public void markNoop(boolean b) throws RocksDBException { + + } + + @Override + public void markRollback(byte[] bytes) throws RocksDBException { + + } + + @Override + public void markCommit(byte[] bytes) throws RocksDBException { + + } + + /** + * Return Key type class for a given table name. + * @param name table name. + * @return String.class by default. + */ + private Class getKeyType(String name) { + return String.class; + } + + /** + * Return Value type class for a given table. + * @param name table name + * @return Value type based on table name. + */ + @VisibleForTesting + protected Class getValueType(String name) { + switch (name) { + case KEY_TABLE : return OmKeyInfo.class; + case VOLUME_TABLE : return OmVolumeArgs.class; + case BUCKET_TABLE : return OmBucketInfo.class; + default: return null; + } + } + + /** + * Get List of events. (Temporary API to unit test the class). + * @return List of events. + */ + public List getEvents() { + return omdbUpdateEvents; + } + +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java new file mode 100644 index 0000000000..3b7cc5bcf2 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Wrapper class to hold multiple OM DB update events. + */ +public class OMUpdateEventBatch { + + private List events; + + OMUpdateEventBatch(Collection e) { + events = new ArrayList<>(e); + } + + /** + * Get Sequence Number and timestamp of last event in this batch. + * @return Event Info instance. + */ + OMDBUpdateEvent.EventInfo getLastEventInfo() { + if (events.isEmpty()) { + return new OMDBUpdateEvent.EventInfo(-1, -1); + } else { + return events.get(events.size() - 1).getEventInfo(); + } + } + + /** + * Return iterator to Event batch. + * @return iterator + */ + public Iterator getIterator() { + return events.iterator(); + } + + /** + * Filter events based on Tables. + * @param tables set of tables to filter on. + * @return trimmed event batch. + */ + public OMUpdateEventBatch filter(Collection tables) { + return new OMUpdateEventBatch(events + .stream() + .filter(e -> tables.contains(e.getTable())) + .collect(Collectors.toList())); + } +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java new file mode 100644 index 0000000000..d828577af6 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.util.Collection; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; + +/** + * Abstract class used to denote a Recon task that needs to act on OM DB events. + */ +public abstract class ReconDBUpdateTask { + + private String taskName; + + protected ReconDBUpdateTask(String taskName) { + this.taskName = taskName; + } + + /** + * Return task name. + * @return task name + */ + public String getTaskName() { + return taskName; + } + + /** + * Return the list of tables that the task is listening on. + * Empty list means the task is NOT listening on any tables. + * @return Collection of Tables. + */ + protected abstract Collection getTaskTables(); + + /** + * Process a set of OM events on tables that the task is listening on. + * @param events Set of events to be processed by the task. + * @return Pair of task name -> task success. + */ + abstract Pair process(OMUpdateEventBatch events); + + /** + * Process a on tables that the task is listening on. + * @param omMetadataManager OM Metadata manager instance. + * @return Pair of task name -> task success. + */ + abstract Pair reprocess(OMMetadataManager omMetadataManager); + +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java new file mode 100644 index 0000000000..7548cc9116 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.util.Map; + +/** + * Controller used by Recon to manage Tasks that are waiting on Recon events. + */ +public interface ReconTaskController { + + /** + * Register API used by tasks to register themselves. + * @param task task instance + */ + void registerTask(ReconDBUpdateTask task); + + /** + * Pass on a set of OM DB update events to the registered tasks. + * @param events set of events + * @throws InterruptedException InterruptedException + */ + void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException; + + /** + * Get set of registered tasks. + * @return Map of Task name -> Task. + */ + Map getRegisteredTasks(); +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java new file mode 100644 index 0000000000..3fd7d966eb --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; +import org.jooq.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; + +/** + * Implementation of ReconTaskController. + */ +public class ReconTaskControllerImpl implements ReconTaskController { + + private static final Logger LOG = + LoggerFactory.getLogger(ReconTaskControllerImpl.class); + + private Map reconDBUpdateTasks; + private ExecutorService executorService; + private int threadCount = 1; + private final Semaphore taskSemaphore = new Semaphore(1); + private final ReconOMMetadataManager omMetadataManager; + private Map taskFailureCounter = new HashMap<>(); + private static final int TASK_FAILURE_THRESHOLD = 2; + private ReconTaskStatusDao reconTaskStatusDao; + + @Inject + public ReconTaskControllerImpl(OzoneConfiguration configuration, + ReconOMMetadataManager omMetadataManager, + Configuration sqlConfiguration) { + this.omMetadataManager = omMetadataManager; + reconDBUpdateTasks = new HashMap<>(); + threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY, + OZONE_RECON_TASK_THREAD_COUNT_DEFAULT); + executorService = Executors.newFixedThreadPool(threadCount); + reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration); + } + + @Override + public void registerTask(ReconDBUpdateTask task) { + String taskName = task.getTaskName(); + LOG.info("Registered task " + taskName + " with controller."); + + // Store task in Task Map. + reconDBUpdateTasks.put(taskName, task); + // Store Task in Task failure tracker. + taskFailureCounter.put(taskName, new AtomicInteger(0)); + // Create DB record for the task. + ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName, + 0L, 0L); + reconTaskStatusDao.insert(reconTaskStatusRecord); + } + + /** + * For every registered task, we try process step twice and then reprocess + * once (if process failed twice) to absorb the events. If a task has failed + * reprocess call more than 2 times across events, it is unregistered + * (blacklisted). + * @param events set of events + * @throws InterruptedException + */ + @Override + public void consumeOMEvents(OMUpdateEventBatch events) + throws InterruptedException { + taskSemaphore.acquire(); + + try { + Collection> tasks = new ArrayList<>(); + for (Map.Entry taskEntry : + reconDBUpdateTasks.entrySet()) { + ReconDBUpdateTask task = taskEntry.getValue(); + tasks.add(() -> task.process(events)); + } + + List> results = executorService.invokeAll(tasks); + List failedTasks = processTaskResults(results, events); + + //Retry + List retryFailedTasks = new ArrayList<>(); + if (!failedTasks.isEmpty()) { + tasks.clear(); + for (String taskName : failedTasks) { + ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); + tasks.add(() -> task.process(events)); + } + results = executorService.invokeAll(tasks); + retryFailedTasks = processTaskResults(results, events); + } + + //Reprocess + //TODO Move to a separate task queue since reprocess may be a heavy + //operation for large OM DB instances + if (!retryFailedTasks.isEmpty()) { + tasks.clear(); + for (String taskName : failedTasks) { + ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); + tasks.add(() -> task.reprocess(omMetadataManager)); + } + results = executorService.invokeAll(tasks); + List reprocessFailedTasks = processTaskResults(results, events); + for (String taskName : reprocessFailedTasks) { + LOG.info("Reprocess step failed for task : " + taskName); + if (taskFailureCounter.get(taskName).incrementAndGet() > + TASK_FAILURE_THRESHOLD) { + LOG.info("Blacklisting Task since it failed retry and " + + "reprocess more than " + TASK_FAILURE_THRESHOLD + " times."); + reconDBUpdateTasks.remove(taskName); + } + } + } + } catch (ExecutionException e) { + LOG.error("Unexpected error : ", e); + } finally { + taskSemaphore.release(); + } + } + + /** + * Store the last completed event sequence number and timestamp to the DB + * for that task. + * @param taskName taskname to be updated. + * @param eventInfo contains the new sequence number and timestamp. + */ + private void storeLastCompletedTransaction( + String taskName, OMDBUpdateEvent.EventInfo eventInfo) { + ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName, + eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber()); + reconTaskStatusDao.update(reconTaskStatusRecord); + } + + @Override + public Map getRegisteredTasks() { + return reconDBUpdateTasks; + } + + /** + * Wait on results of all tasks. + * @param results Set of Futures. + * @param events Events. + * @return List of failed task names + * @throws ExecutionException execution Exception + * @throws InterruptedException Interrupted Exception + */ + private List processTaskResults(List> results, + OMUpdateEventBatch events) + throws ExecutionException, InterruptedException { + List failedTasks = new ArrayList<>(); + for (Future f : results) { + String taskName = f.get().getLeft().toString(); + if (!(Boolean)f.get().getRight()) { + LOG.info("Failed task : " + taskName); + failedTasks.add(f.get().getLeft().toString()); + } else { + taskFailureCounter.get(taskName).set(0); + storeLastCompletedTransaction(taskName, events.getLastEventInfo()); + } + } + return failedTasks; + } +} diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java index bd7ea84b19..6363e9c4bd 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java @@ -190,8 +190,11 @@ protected void configure() { //Generate Recon container DB data. ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask( - ozoneManagerServiceProvider, containerDbServiceProvider); - containerKeyMapperTask.run(); + containerDbServiceProvider, + ozoneManagerServiceProvider.getOMMetadataManagerInstance()); + ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); + containerKeyMapperTask.reprocess(ozoneManagerServiceProvider + .getOMMetadataManagerInstance()); } @Test diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java index 983f74a3c1..c2fbce25ca 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/AbstractSqlDatabaseTest.java @@ -80,7 +80,10 @@ protected DSLContext getDslContext() { return dslContext; } - static class DataSourceConfigurationProvider implements + /** + * Local Sqlite datasource provider. + */ + public static class DataSourceConfigurationProvider implements Provider { @Override diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestReconInternalSchemaDefinition.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestReconInternalSchemaDefinition.java new file mode 100644 index 0000000000..150007ef58 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestReconInternalSchemaDefinition.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.persistence; + +import static org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition.RECON_TASK_STATUS_TABLE_NAME; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +import javax.sql.DataSource; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; +import org.jooq.Configuration; +import org.junit.Assert; +import org.junit.Test; + +/** + * Class used to test ReconInternalSchemaDefinition. + */ +public class TestReconInternalSchemaDefinition extends AbstractSqlDatabaseTest { + + @Test + public void testSchemaCreated() throws Exception { + ReconInternalSchemaDefinition schemaDefinition = getInjector().getInstance( + ReconInternalSchemaDefinition.class); + + schemaDefinition.initializeSchema(); + + Connection connection = + getInjector().getInstance(DataSource.class).getConnection(); + // Verify table definition + DatabaseMetaData metaData = connection.getMetaData(); + ResultSet resultSet = metaData.getColumns(null, null, + RECON_TASK_STATUS_TABLE_NAME, null); + + List> expectedPairs = new ArrayList<>(); + + expectedPairs.add(new ImmutablePair<>("task_name", Types.VARCHAR)); + expectedPairs.add(new ImmutablePair<>("last_updated_timestamp", + Types.INTEGER)); + expectedPairs.add(new ImmutablePair<>("last_updated_seq_number", + Types.INTEGER)); + + List> actualPairs = new ArrayList<>(); + + while (resultSet.next()) { + actualPairs.add(new ImmutablePair<>( + resultSet.getString("COLUMN_NAME"), + resultSet.getInt("DATA_TYPE"))); + } + + Assert.assertEquals(3, actualPairs.size()); + Assert.assertEquals(expectedPairs, actualPairs); + } + + @Test + public void testReconTaskStatusCRUDOperations() throws Exception { + // Verify table exists + ReconInternalSchemaDefinition schemaDefinition = getInjector().getInstance( + ReconInternalSchemaDefinition.class); + + schemaDefinition.initializeSchema(); + + DataSource ds = getInjector().getInstance(DataSource.class); + Connection connection = ds.getConnection(); + + DatabaseMetaData metaData = connection.getMetaData(); + ResultSet resultSet = metaData.getTables(null, null, + RECON_TASK_STATUS_TABLE_NAME, null); + + while (resultSet.next()) { + Assert.assertEquals(RECON_TASK_STATUS_TABLE_NAME, + resultSet.getString("TABLE_NAME")); + } + + ReconTaskStatusDao dao = new ReconTaskStatusDao(getInjector().getInstance( + Configuration.class)); + + long now = System.currentTimeMillis(); + ReconTaskStatus newRecord = new ReconTaskStatus(); + newRecord.setTaskName("HelloWorldTask"); + newRecord.setLastUpdatedTimestamp(now); + newRecord.setLastUpdatedSeqNumber(100L); + + // Create + dao.insert(newRecord); + + ReconTaskStatus newRecord2 = new ReconTaskStatus(); + newRecord2.setTaskName("GoodbyeWorldTask"); + newRecord2.setLastUpdatedTimestamp(now); + newRecord2.setLastUpdatedSeqNumber(200L); + // Create + dao.insert(newRecord2); + + // Read + ReconTaskStatus dbRecord = dao.findById("HelloWorldTask"); + + Assert.assertEquals("HelloWorldTask", dbRecord.getTaskName()); + Assert.assertEquals(Long.valueOf(now), dbRecord.getLastUpdatedTimestamp()); + Assert.assertEquals(Long.valueOf(100), dbRecord.getLastUpdatedSeqNumber()); + + // Update + dbRecord.setLastUpdatedSeqNumber(150L); + dao.update(dbRecord); + + // Read updated + dbRecord = dao.findById("HelloWorldTask"); + Assert.assertEquals(Long.valueOf(150), dbRecord.getLastUpdatedSeqNumber()); + + // Delete + dao.deleteById("GoodbyeWorldTask"); + + // Verify + dbRecord = dao.findById("GoodbyeWorldTask"); + + Assert.assertNull(dbRecord); + } + +} diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java index ba2dd0b772..78d964dcb1 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; import java.io.File; -import java.io.IOException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -127,22 +126,6 @@ public void testUpdateOmDB() throws Exception { .get("/sampleVol/bucketOne/key_one")); Assert.assertNotNull(reconOMMetadataManager.getKeyTable() .get("/sampleVol/bucketOne/key_two")); - - //Verify that we cannot write data to Recon OM DB (Read Only) - try { - reconOMMetadataManager.getKeyTable().put( - "/sampleVol/bucketOne/fail_key", new OmKeyInfo.Builder() - .setBucketName("bucketOne") - .setVolumeName("sampleVol") - .setKeyName("fail_key") - .setReplicationFactor(HddsProtos.ReplicationFactor.ONE) - .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE) - .build()); - Assert.fail(); - } catch (IOException e) { - Assert.assertTrue(e.getMessage() - .contains("Not supported operation in read only mode")); - } } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java index 75664f0357..85e5eed834 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java @@ -203,4 +203,29 @@ public void testGetKeyPrefixesForContainer() throws Exception { assertTrue(keyPrefixMap.size() == 1); assertTrue(keyPrefixMap.get(containerKeyPrefix3) == 3); } + + @Test + public void testDeleteContainerMapping() throws IOException { + long containerId = System.currentTimeMillis(); + + ContainerKeyPrefix containerKeyPrefix1 = new + ContainerKeyPrefix(containerId, "V3/B1/K1", 0); + containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix1, + 1); + + ContainerKeyPrefix containerKeyPrefix2 = new ContainerKeyPrefix( + containerId, "V3/B1/K2", 0); + containerDbServiceProvider.storeContainerKeyMapping(containerKeyPrefix2, + 2); + + Map keyPrefixMap = + containerDbServiceProvider.getKeyPrefixesForContainer(containerId); + assertTrue(keyPrefixMap.size() == 2); + + containerDbServiceProvider.deleteContainerMapping(new ContainerKeyPrefix( + containerId, "V3/B1/K2", 0)); + keyPrefixMap = + containerDbServiceProvider.getKeyPrefixesForContainer(containerId); + assertTrue(keyPrefixMap.size() == 1); + } } diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java new file mode 100644 index 0000000000..3073907fc0 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.OMMetadataManager; + +/** + * Dummy Recon task that has 3 modes of operations. + * ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS + */ +public class DummyReconDBTask extends ReconDBUpdateTask { + + private int numFailuresAllowed = Integer.MIN_VALUE; + private int callCtr = 0; + + public DummyReconDBTask(String taskName, TaskType taskType) { + super(taskName); + if (taskType.equals(TaskType.FAIL_ONCE)) { + numFailuresAllowed = 1; + } else if (taskType.equals(TaskType.ALWAYS_FAIL)) { + numFailuresAllowed = Integer.MAX_VALUE; + } + } + + @Override + protected Collection getTaskTables() { + return Collections.singletonList("volumeTable"); + } + + @Override + Pair process(OMUpdateEventBatch events) { + if (++callCtr <= numFailuresAllowed) { + return new ImmutablePair<>(getTaskName(), false); + } else { + return new ImmutablePair<>(getTaskName(), true); + } + } + + @Override + Pair reprocess(OMMetadataManager omMetadataManager) { + if (++callCtr <= numFailuresAllowed) { + return new ImmutablePair<>(getTaskName(), false); + } else { + return new ImmutablePair<>(getTaskName(), true); + } + } + + /** + * Type of the task. + */ + public enum TaskType { + ALWAYS_PASS, + FAIL_ONCE, + ALWAYS_FAIL + } +} diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 6ee95e62a3..c67d7fd46f 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -23,10 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,9 +31,10 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest; @@ -48,14 +46,11 @@ import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider; -import org.apache.hadoop.utils.db.DBCheckpoint; import org.apache.hadoop.utils.db.DBStore; -import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -110,7 +105,7 @@ protected void configure() { } @Test - public void testRun() throws Exception{ + public void testReprocessOMDB() throws Exception{ Map keyPrefixesForContainer = containerDbServiceProvider.getKeyPrefixesForContainer(1); @@ -137,25 +132,17 @@ public void testRun() throws Exception{ OmKeyLocationInfoGroup omKeyLocationInfoGroup = new OmKeyLocationInfoGroup(0, omKeyLocationInfoList); - writeDataToOm(omMetadataManager, + writeDataToOm(reconOMMetadataManager, "key_one", "bucketOne", "sampleVol", Collections.singletonList(omKeyLocationInfoGroup)); - //Take snapshot of OM DB and copy over to Recon OM DB. - DBCheckpoint checkpoint = omMetadataManager.getStore() - .getCheckpoint(true); - File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation()); - InputStream inputStream = new FileInputStream(tarFile); - PowerMockito.stub(PowerMockito.method(ReconUtils.class, - "makeHttpCall", - CloseableHttpClient.class, String.class)) - .toReturn(inputStream); - - ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask( - ozoneManagerServiceProvider, containerDbServiceProvider); - containerKeyMapperTask.run(); + ContainerKeyMapperTask containerKeyMapperTask = + new ContainerKeyMapperTask(containerDbServiceProvider, + ozoneManagerServiceProvider.getOMMetadataManagerInstance()); + containerKeyMapperTask.reprocess(ozoneManagerServiceProvider + .getOMMetadataManagerInstance()); keyPrefixesForContainer = containerDbServiceProvider.getKeyPrefixesForContainer(1); @@ -176,6 +163,130 @@ public void testRun() throws Exception{ keyPrefixesForContainer.get(containerKeyPrefix).intValue()); } + @Test + public void testProcessOMEvents() throws IOException { + Map keyPrefixesForContainer = + containerDbServiceProvider.getKeyPrefixesForContainer(1); + assertTrue(keyPrefixesForContainer.isEmpty()); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(2); + assertTrue(keyPrefixesForContainer.isEmpty()); + + Pipeline pipeline = getRandomPipeline(); + + List omKeyLocationInfoList = new ArrayList<>(); + BlockID blockID1 = new BlockID(1, 1); + OmKeyLocationInfo omKeyLocationInfo1 = getOmKeyLocationInfo(blockID1, + pipeline); + + BlockID blockID2 = new BlockID(2, 1); + OmKeyLocationInfo omKeyLocationInfo2 + = getOmKeyLocationInfo(blockID2, pipeline); + + omKeyLocationInfoList.add(omKeyLocationInfo1); + omKeyLocationInfoList.add(omKeyLocationInfo2); + + OmKeyLocationInfoGroup omKeyLocationInfoGroup = new + OmKeyLocationInfoGroup(0, omKeyLocationInfoList); + + String bucket = "bucketOne"; + String volume = "sampleVol"; + String key = "key_one"; + String omKey = omMetadataManager.getOzoneKey(volume, bucket, key); + OmKeyInfo omKeyInfo = buildOmKeyInfo(volume, bucket, key, + omKeyLocationInfoGroup); + + OMDBUpdateEvent keyEvent1 = new OMDBUpdateEvent. + OMUpdateEventBuilder() + .setKey(omKey) + .setValue(omKeyInfo) + .setTable(omMetadataManager.getKeyTable().getName()) + .setAction(OMDBUpdateEvent.OMDBUpdateAction.PUT) + .build(); + + BlockID blockID3 = new BlockID(1, 2); + OmKeyLocationInfo omKeyLocationInfo3 = + getOmKeyLocationInfo(blockID3, pipeline); + + BlockID blockID4 = new BlockID(3, 1); + OmKeyLocationInfo omKeyLocationInfo4 + = getOmKeyLocationInfo(blockID4, pipeline); + + omKeyLocationInfoList = new ArrayList<>(); + omKeyLocationInfoList.add(omKeyLocationInfo3); + omKeyLocationInfoList.add(omKeyLocationInfo4); + omKeyLocationInfoGroup = new OmKeyLocationInfoGroup(0, + omKeyLocationInfoList); + + String key2 = "key_two"; + writeDataToOm(reconOMMetadataManager, key2, bucket, volume, Collections + .singletonList(omKeyLocationInfoGroup)); + + omKey = omMetadataManager.getOzoneKey(volume, bucket, key2); + OMDBUpdateEvent keyEvent2 = new OMDBUpdateEvent. + OMUpdateEventBuilder() + .setKey(omKey) + .setAction(OMDBUpdateEvent.OMDBUpdateAction.DELETE) + .setTable(omMetadataManager.getKeyTable().getName()) + .build(); + + OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(new + ArrayList() {{ + add(keyEvent1); + add(keyEvent2); + }}); + + ContainerKeyMapperTask containerKeyMapperTask = + new ContainerKeyMapperTask(containerDbServiceProvider, + ozoneManagerServiceProvider.getOMMetadataManagerInstance()); + containerKeyMapperTask.reprocess(ozoneManagerServiceProvider + .getOMMetadataManagerInstance()); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(1); + assertTrue(keyPrefixesForContainer.size() == 1); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(2); + assertTrue(keyPrefixesForContainer.isEmpty()); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(3); + assertTrue(keyPrefixesForContainer.size() == 1); + + // Process PUT & DELETE event. + containerKeyMapperTask.process(omUpdateEventBatch); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(1); + assertTrue(keyPrefixesForContainer.size() == 1); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(2); + assertTrue(keyPrefixesForContainer.size() == 1); + + keyPrefixesForContainer = containerDbServiceProvider + .getKeyPrefixesForContainer(3); + assertTrue(keyPrefixesForContainer.isEmpty()); + + } + + private OmKeyInfo buildOmKeyInfo(String volume, + String bucket, + String key, + OmKeyLocationInfoGroup + omKeyLocationInfoGroup) { + return new OmKeyInfo.Builder() + .setBucketName(bucket) + .setVolumeName(volume) + .setKeyName(key) + .setReplicationFactor(HddsProtos.ReplicationFactor.ONE) + .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE) + .setOmKeyLocationInfos(Collections.singletonList( + omKeyLocationInfoGroup)) + .build(); + } /** * Get Test OzoneConfiguration instance. * @return OzoneConfiguration diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java new file mode 100644 index 0000000000..98feaff4d4 --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.utils.db.RDBStore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.RocksDB; +import org.rocksdb.TransactionLogIterator; +import org.rocksdb.WriteBatch; + +/** + * Class used to test OMDBUpdatesHandler. + */ +public class TestOMDBUpdatesHandler { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneConfiguration createNewTestPath() throws IOException { + OzoneConfiguration configuration = new OzoneConfiguration(); + File newFolder = folder.newFolder(); + if (!newFolder.exists()) { + assertTrue(newFolder.mkdirs()); + } + ServerUtils.setOzoneMetaDirPath(configuration, newFolder.toString()); + return configuration; + } + + @Test + public void testPut() throws Exception { + OzoneConfiguration configuration = createNewTestPath(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration); + + String volumeKey = metaMgr.getVolumeKey("sampleVol"); + OmVolumeArgs args = + OmVolumeArgs.newBuilder() + .setVolume("sampleVol") + .setAdminName("bilbo") + .setOwnerName("bilbo") + .build(); + metaMgr.getVolumeTable().put(volumeKey, args); + + OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() + .setBucketName("bucketOne") + .setVolumeName("sampleVol") + .setKeyName("key_one") + .setReplicationFactor(HddsProtos.ReplicationFactor.ONE) + .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE) + .build(); + + metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_one", omKeyInfo); + RDBStore rdbStore = (RDBStore) metaMgr.getStore(); + + RocksDB rocksDB = rdbStore.getDb(); + TransactionLogIterator transactionLogIterator = + rocksDB.getUpdatesSince(0); + List writeBatches = new ArrayList<>(); + + while(transactionLogIterator.isValid()) { + TransactionLogIterator.BatchResult result = + transactionLogIterator.getBatch(); + result.writeBatch().markWalTerminationPoint(); + WriteBatch writeBatch = result.writeBatch(); + writeBatches.add(writeBatch.data()); + transactionLogIterator.next(); + } + + OzoneConfiguration conf2 = createNewTestPath(); + OmMetadataManagerImpl reconOmmetaMgr = new OmMetadataManagerImpl(conf2); + List events = new ArrayList<>(); + for (byte[] data : writeBatches) { + WriteBatch writeBatch = new WriteBatch(data); + OMDBUpdatesHandler omdbUpdatesHandler = + new OMDBUpdatesHandler(reconOmmetaMgr); + writeBatch.iterate(omdbUpdatesHandler); + events.addAll(omdbUpdatesHandler.getEvents()); + } + assertNotNull(events); + assertTrue(events.size() == 2); + + OMDBUpdateEvent volEvent = events.get(0); + assertEquals(OMDBUpdateEvent.OMDBUpdateAction.PUT, volEvent.getAction()); + assertEquals(volumeKey, volEvent.getKey()); + assertEquals(args.getVolume(), ((OmVolumeArgs)volEvent.getValue()) + .getVolume()); + + OMDBUpdateEvent keyEvent = events.get(1); + assertEquals(OMDBUpdateEvent.OMDBUpdateAction.PUT, keyEvent.getAction()); + assertEquals("/sampleVol/bucketOne/key_one", keyEvent.getKey()); + assertEquals(omKeyInfo.getBucketName(), + ((OmKeyInfo)keyEvent.getValue()).getBucketName()); + } + + @Test + public void testDelete() throws Exception { + OzoneConfiguration configuration = createNewTestPath(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration); + + String volumeKey = metaMgr.getVolumeKey("sampleVol"); + OmVolumeArgs args = + OmVolumeArgs.newBuilder() + .setVolume("sampleVol") + .setAdminName("bilbo") + .setOwnerName("bilbo") + .build(); + metaMgr.getVolumeTable().put(volumeKey, args); + + OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() + .setBucketName("bucketOne") + .setVolumeName("sampleVol") + .setKeyName("key_one") + .setReplicationFactor(HddsProtos.ReplicationFactor.ONE) + .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE) + .build(); + + metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_one", omKeyInfo); + + metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_one"); + metaMgr.getVolumeTable().delete(volumeKey); + + RDBStore rdbStore = (RDBStore) metaMgr.getStore(); + + RocksDB rocksDB = rdbStore.getDb(); + TransactionLogIterator transactionLogIterator = + rocksDB.getUpdatesSince(0); + List writeBatches = new ArrayList<>(); + + while(transactionLogIterator.isValid()) { + TransactionLogIterator.BatchResult result = + transactionLogIterator.getBatch(); + result.writeBatch().markWalTerminationPoint(); + WriteBatch writeBatch = result.writeBatch(); + writeBatches.add(writeBatch.data()); + transactionLogIterator.next(); + } + + OzoneConfiguration conf2 = createNewTestPath(); + OmMetadataManagerImpl reconOmmetaMgr = new OmMetadataManagerImpl(conf2); + List events = new ArrayList<>(); + for (byte[] data : writeBatches) { + WriteBatch writeBatch = new WriteBatch(data); + OMDBUpdatesHandler omdbUpdatesHandler = + new OMDBUpdatesHandler(reconOmmetaMgr); + writeBatch.iterate(omdbUpdatesHandler); + events.addAll(omdbUpdatesHandler.getEvents()); + } + assertNotNull(events); + assertTrue(events.size() == 4); + + OMDBUpdateEvent keyEvent = events.get(2); + assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, keyEvent.getAction()); + assertEquals("/sampleVol/bucketOne/key_one", keyEvent.getKey()); + + OMDBUpdateEvent volEvent = events.get(3); + assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, volEvent.getAction()); + assertEquals(volumeKey, volEvent.getKey()); + } + + @Test + public void testGetValueType() throws IOException { + OzoneConfiguration configuration = createNewTestPath(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration); + OMDBUpdatesHandler omdbUpdatesHandler = + new OMDBUpdatesHandler(metaMgr); + + assertEquals(OmKeyInfo.class, omdbUpdatesHandler.getValueType( + metaMgr.getKeyTable().getName())); + assertEquals(OmVolumeArgs.class, omdbUpdatesHandler.getValueType( + metaMgr.getVolumeTable().getName())); + assertEquals(OmBucketInfo.class, omdbUpdatesHandler.getValueType( + metaMgr.getBucketTable().getName())); + } +} \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java new file mode 100644 index 0000000000..b587c89f5b --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl; +import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; +import org.jooq.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Class used to test ReconTaskControllerImpl. + */ +public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { + + private ReconTaskController reconTaskController; + + private Configuration sqlConfiguration; + @Before + public void setUp() throws Exception { + + File omDbDir = temporaryFolder.newFolder(); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath()); + ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl( + ozoneConfiguration); + + sqlConfiguration = getInjector() + .getInstance(Configuration.class); + + ReconInternalSchemaDefinition schemaDefinition = getInjector(). + getInstance(ReconInternalSchemaDefinition.class); + schemaDefinition.initializeSchema(); + + reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration, + omMetadataManager, sqlConfiguration); + } + + @Test + public void testRegisterTask() throws Exception { + String taskName = "Dummy_" + System.currentTimeMillis(); + DummyReconDBTask dummyReconDBTask = + new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_PASS); + reconTaskController.registerTask(dummyReconDBTask); + assertTrue(reconTaskController.getRegisteredTasks().size() == 1); + assertTrue(reconTaskController.getRegisteredTasks() + .get(dummyReconDBTask.getTaskName()) == dummyReconDBTask); + } + + @Test + public void testConsumeOMEvents() throws Exception { + + ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class); + when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections + .EMPTY_LIST); + when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask"); + when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class))) + .thenReturn(new ImmutablePair<>("MockTask", true)); + reconTaskController.registerTask(reconDBUpdateTaskMock); + reconTaskController.consumeOMEvents( + new OMUpdateEventBatch(Collections.emptyList())); + + verify(reconDBUpdateTaskMock, times(1)) + .process(any()); + } + + @Test + public void testFailedTaskRetryLogic() throws Exception { + String taskName = "Dummy_" + System.currentTimeMillis(); + DummyReconDBTask dummyReconDBTask = + new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE); + reconTaskController.registerTask(dummyReconDBTask); + + + long currentTime = System.nanoTime(); + OMDBUpdateEvent.EventInfo eventInfoMock = mock( + OMDBUpdateEvent.EventInfo.class); + when(eventInfoMock.getSequenceNumber()).thenReturn(100L); + when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime); + + OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); + when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock); + + reconTaskController.consumeOMEvents(omUpdateEventBatchMock); + assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); + assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() + .get(dummyReconDBTask.getTaskName())); + + ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration); + ReconTaskStatus dbRecord = dao.findById(taskName); + + Assert.assertEquals(taskName, dbRecord.getTaskName()); + Assert.assertEquals(Long.valueOf(currentTime), + dbRecord.getLastUpdatedTimestamp()); + Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber()); + } + + @Test + public void testBadBehavedTaskBlacklisting() throws Exception { + String taskName = "Dummy_" + System.currentTimeMillis(); + DummyReconDBTask dummyReconDBTask = + new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL); + reconTaskController.registerTask(dummyReconDBTask); + + + long currentTime = System.nanoTime(); + OMDBUpdateEvent.EventInfo eventInfoMock = + mock(OMDBUpdateEvent.EventInfo.class); + when(eventInfoMock.getSequenceNumber()).thenReturn(100L); + when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime); + + OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); + when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock); + + for (int i = 0; i < 2; i++) { + reconTaskController.consumeOMEvents(omUpdateEventBatchMock); + + assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); + assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() + .get(dummyReconDBTask.getTaskName())); + } + + //Should be blacklisted now. + reconTaskController.consumeOMEvents( + new OMUpdateEventBatch(Collections.emptyList())); + assertTrue(reconTaskController.getRegisteredTasks().isEmpty()); + + ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration); + ReconTaskStatus dbRecord = dao.findById(taskName); + + Assert.assertEquals(taskName, dbRecord.getTaskName()); + Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp()); + Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber()); + } +} \ No newline at end of file