diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java index 54ebc7ca6c..c4dca6a13a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java @@ -41,6 +41,10 @@ public class DBUpdatesWrapper { return dataList; } + public void setCurrentSequenceNumber(long sequenceNumber) { + this.currentSequenceNumber = sequenceNumber; + } + public long getCurrentSequenceNumber() { return currentSequenceNumber; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java index a8b78ed8a9..19699f587c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java @@ -37,6 +37,10 @@ public class RDBBatchOperation implements BatchOperation { writeBatch = new WriteBatch(); } + public RDBBatchOperation(WriteBatch writeBatch) { + this.writeBatch = writeBatch; + } + public void commit(RocksDB db, WriteOptions writeOptions) throws IOException { try { db.write(writeOptions, writeBatch); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 3d4dd937e7..f6a4e64e71 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import java.io.Closeable; @@ -49,6 +50,8 @@ import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector; import org.apache.hadoop.ozone.security.acl.OzoneObj; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; +import org.apache.hadoop.utils.db.SequenceNumberNotFoundException; /** * Protocol to talk to OM. @@ -505,4 +508,14 @@ public interface OzoneManagerProtocol * */ List getAcl(OzoneObj obj) throws IOException; + /** + * Get DB updates since a specific sequence number. + * @param dbUpdatesRequest request that encapsulates a sequence number. + * @return Wrapper containing the updates. + * @throws SequenceNumberNotFoundException if db is unable to read the data. + */ + DBUpdatesWrapper getDBUpdates( + OzoneManagerProtocolProtos.DBUpdatesRequest dbUpdatesRequest) + throws IOException; + } \ No newline at end of file diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index f76154871e..83c5316089 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -66,7 +66,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse; @@ -138,11 +140,14 @@ import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1396,7 +1401,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB OMRequest omRequest = createOMRequest(Type.SetAcl) .setSetAclRequest(builder.build()) .build(); - OzoneManagerProtocolProtos.SetAclResponse response = + SetAclResponse response = handleError(submitRequest(omRequest)).getSetAclResponse(); return response.getResponse(); @@ -1425,6 +1430,25 @@ public final class OzoneManagerProtocolClientSideTranslatorPB return acls; } + @Override + public DBUpdatesWrapper getDBUpdates(DBUpdatesRequest dbUpdatesRequest) + throws IOException { + OMRequest omRequest = createOMRequest(Type.DBUpdates) + .setDbUpdatesRequest(dbUpdatesRequest) + .build(); + + DBUpdatesResponse dbUpdatesResponse = + handleError(submitRequest(omRequest)).getDbUpdatesResponse(); + + DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper(); + for (ByteString byteString : dbUpdatesResponse.getDataList()) { + dbUpdatesWrapper.addWriteBatch(byteString.toByteArray(), 0L); + } + dbUpdatesWrapper.setCurrentSequenceNumber( + dbUpdatesResponse.getSequenceNumber()); + return dbUpdatesWrapper; + } + @Override public OpenKeySession createFile(OmKeyArgs args, boolean overWrite, boolean recursive) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index a81b0de5c8..dbd5d39881 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -3422,6 +3422,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl * @return Wrapper containing the updates. * @throws SequenceNumberNotFoundException if db is unable to read the data. */ + @Override public DBUpdatesWrapper getDBUpdates( DBUpdatesRequest dbUpdatesRequest) throws SequenceNumberNotFoundException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 7c9f12643f..c2d62278b8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -394,9 +394,10 @@ public class OzoneManagerRequestHandler implements RequestHandler { DBUpdatesWrapper dbUpdatesWrapper = impl.getDBUpdates(dbUpdatesRequest); for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) { - builder.setData(i, - OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i))); + builder.addData(OMPBHelper.getByteString( + dbUpdatesWrapper.getData().get(i))); } + builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber()); return builder.build(); } diff --git a/hadoop-ozone/ozone-recon/pom.xml b/hadoop-ozone/ozone-recon/pom.xml index 9672796a84..130ad353f4 100644 --- a/hadoop-ozone/ozone-recon/pom.xml +++ b/hadoop-ozone/ozone-recon/pom.xml @@ -272,30 +272,6 @@ 2.8.9 test - - org.powermock - powermock-module-junit4 - 1.7.4 - test - - - org.javassist - javassist - - - - - org.powermock - powermock-api-mockito2 - 1.7.4 - test - - - org.mockito - mockito-core - - - org.jooq jooq diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index e7c20f0755..21bc5becfa 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -29,8 +29,13 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_AGE; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration; import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; @@ -40,9 +45,15 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask; import org.apache.hadoop.ozone.recon.tasks.ReconTaskController; import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.utils.db.DBStore; +import org.apache.ratis.protocol.ClientId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.inject.AbstractModule; import com.google.inject.Provides; @@ -52,6 +63,9 @@ import com.google.inject.Singleton; * Guice controller that defines concrete bindings. */ public class ReconControllerModule extends AbstractModule { + private static final Logger LOG = + LoggerFactory.getLogger(ReconControllerModule.class); + @Override protected void configure() { bind(Configuration.class).toProvider(ConfigurationProvider.class); @@ -60,17 +74,37 @@ public class ReconControllerModule extends AbstractModule { .toProvider(ReconContainerDBProvider.class).in(Singleton.class); bind(ReconOMMetadataManager.class) .to(ReconOmMetadataManagerImpl.class).in(Singleton.class); + bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class) + .in(Singleton.class); bind(ContainerDBServiceProvider.class) .to(ContainerDBServiceProviderImpl.class).in(Singleton.class); bind(OzoneManagerServiceProvider.class) .to(OzoneManagerServiceProviderImpl.class).in(Singleton.class); - + bind(ReconUtils.class).in(Singleton.class); // Persistence - inject configuration provider install(new JooqPersistenceModule( getProvider(DataSourceConfiguration.class))); bind(ReconTaskController.class) .to(ReconTaskControllerImpl.class).in(Singleton.class); + bind(ContainerKeyMapperTask.class); + bind(FileSizeCountTask.class); + } + + @Provides + OzoneManagerProtocol getOzoneManagerProtocol( + final OzoneConfiguration ozoneConfiguration) { + OzoneManagerProtocol ozoneManagerClient = null; + try { + ClientId clientId = ClientId.randomId(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ozoneManagerClient = new + OzoneManagerProtocolClientSideTranslatorPB( + ozoneConfiguration, clientId.toString(), ugi); + } catch (IOException ioEx) { + LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx); + } + return ozoneManagerClient; } @Provides diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java index a11cb5f881..1aaf8873d5 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java @@ -18,26 +18,15 @@ package org.apache.hadoop.ozone.recon; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL; -import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT; - -import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.cli.GenericCli; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider; import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; -import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask; import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition; import org.hadoop.ozone.recon.schema.StatsSchemaDefinition; import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; -import org.jooq.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,13 +58,15 @@ public class ReconServer extends GenericCli { ConfigurationProvider.setConfiguration(ozoneConfiguration); injector = Guice.createInjector(new - ReconControllerModule(), new ReconRestServletModule() { + ReconControllerModule(), + new ReconRestServletModule() { @Override protected void configureServlets() { rest("/api/*") .packages("org.apache.hadoop.ozone.recon.api"); } - }); + }, + new ReconTaskBindingModule()); //Pass on injector to listener that does the Guice - Jersey HK2 bridging. ReconGuiceServletContextListener.setInjector(injector); @@ -95,14 +86,20 @@ public class ReconServer extends GenericCli { reconInternalSchemaDefinition.initializeSchema(); LOG.info("Recon server initialized successfully!"); + + httpServer = injector.getInstance(ReconHttpServer.class); + LOG.info("Starting Recon server"); + httpServer.start(); + + //Start Ozone Manager Service that pulls data from OM. + OzoneManagerServiceProvider ozoneManagerServiceProvider = injector + .getInstance(OzoneManagerServiceProvider.class); + ozoneManagerServiceProvider.start(); } catch (Exception e) { LOG.error("Error during initializing Recon server.", e); + stop(); } - httpServer = injector.getInstance(ReconHttpServer.class); - LOG.info("Starting Recon server"); - httpServer.start(); - scheduleReconTasks(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { stop(); @@ -113,52 +110,6 @@ public class ReconServer extends GenericCli { return null; } - /** - * Schedule the tasks that is required by Recon to keep its metadata up to - * date. - */ - private void scheduleReconTasks() { - OzoneConfiguration configuration = injector.getInstance( - OzoneConfiguration.class); - ContainerDBServiceProvider containerDBServiceProvider = injector - .getInstance(ContainerDBServiceProvider.class); - OzoneManagerServiceProvider ozoneManagerServiceProvider = injector - .getInstance(OzoneManagerServiceProvider.class); - Configuration sqlConfiguration = injector.getInstance(Configuration.class); - long initialDelay = configuration.getTimeDuration( - RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY, - RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT, - TimeUnit.MILLISECONDS); - long interval = configuration.getTimeDuration( - RECON_OM_SNAPSHOT_TASK_INTERVAL, - RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - - - scheduler.scheduleWithFixedDelay(() -> { - try { - ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); - // Schedule the task to read OM DB and write the reverse mapping to - // Recon container DB. - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(containerDBServiceProvider, - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - containerKeyMapperTask.reprocess( - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - FileSizeCountTask fileSizeCountTask = new - FileSizeCountTask( - ozoneManagerServiceProvider.getOMMetadataManagerInstance(), - sqlConfiguration); - fileSizeCountTask.reprocess( - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - - } catch (IOException e) { - LOG.error("Unable to get OM " + - "Snapshot", e); - } - }, initialDelay, interval, TimeUnit.MILLISECONDS); - } - void stop() throws Exception { LOG.info("Stopping Recon server"); httpServer.stop(); diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index 0501093ae0..034af4a527 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -114,7 +114,7 @@ public final class ReconServerConfigKeys { public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; - public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1; + public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5; /** * Private constructor for utility class. diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java new file mode 100644 index 0000000000..19cc0da23e --- /dev/null +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon; + +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask; +import org.apache.hadoop.ozone.recon.tasks.ReconDBUpdateTask; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.Multibinder; + +/** + * Binds the various Recon Tasks. + */ +public class ReconTaskBindingModule extends AbstractModule { + + @Override + protected void configure() { + Multibinder taskBinder = + Multibinder.newSetBinder(binder(), ReconDBUpdateTask.class); + taskBinder.addBinding().to(ContainerKeyMapperTask.class); + taskBinder.addBinding().to(FileSizeCountTask.class); + } +} diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java index 324a369710..95e6f9b00a 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java @@ -51,11 +51,11 @@ import org.slf4j.LoggerFactory; /** * Recon Utility class. */ -public final class ReconUtils { +public class ReconUtils { private final static int WRITE_BUFFER = 1048576; //1MB - private ReconUtils() { + public ReconUtils() { } private static final Logger LOG = LoggerFactory.getLogger( @@ -69,7 +69,7 @@ public final class ReconUtils { * @param dirConfigKey key to check * @return Return File based on configured or fallback value. */ - public static File getReconDbDir(Configuration conf, String dirConfigKey) { + public File getReconDbDir(Configuration conf, String dirConfigKey) { File metadataDir = getDirectoryFromConfig(conf, dirConfigKey, "Recon"); @@ -90,7 +90,7 @@ public final class ReconUtils { * @param destPath destination path to untar to. * @throws IOException ioException */ - public static void untarCheckpointFile(File tarFile, Path destPath) + public void untarCheckpointFile(File tarFile, Path destPath) throws IOException { FileInputStream fileInputStream = null; @@ -153,7 +153,7 @@ public final class ReconUtils { * @return Inputstream to the response of the HTTP call. * @throws IOException While reading the response. */ - public static InputStream makeHttpCall(CloseableHttpClient httpClient, + public InputStream makeHttpCall(CloseableHttpClient httpClient, String url) throws IOException { diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java index 420f333ad9..3f57af6f5d 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.recon.spi; * limitations under the License. */ -import java.io.IOException; - import org.apache.hadoop.ozone.om.OMMetadataManager; /** @@ -28,14 +26,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; public interface OzoneManagerServiceProvider { /** - * Initialize Ozone Manager Service Provider Impl. + * Start a task to sync data from OM. */ - void init() throws IOException; + void start(); /** - * Update Recon OM DB with new snapshot from OM. + * Stop the OM sync data. */ - void updateReconOmDBWithNewSnapshot() throws IOException; + void stop(); /** * Return instance of OM Metadata manager. diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java index 42aab2e779..b1532fa8ad 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java @@ -37,6 +37,7 @@ import javax.inject.Singleton; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider; @@ -73,6 +74,9 @@ public class ContainerDBServiceProviderImpl @Inject private Configuration sqlConfiguration; + @Inject + private ReconUtils reconUtils; + @Inject public ContainerDBServiceProviderImpl(DBStore dbStore, Configuration sqlConfiguration) { @@ -101,7 +105,8 @@ public class ContainerDBServiceProviderImpl throws IOException { File oldDBLocation = containerDbStore.getDbLocation(); - containerDbStore = ReconContainerDBProvider.getNewDBStore(configuration); + containerDbStore = ReconContainerDBProvider + .getNewDBStore(configuration, reconUtils); containerKeyTable = containerDbStore.getTable(CONTAINER_KEY_TABLE, ContainerKeyPrefix.class, Integer.class); diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 389be1b12f..e7da3a0079 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -27,35 +27,54 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNE import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir; -import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall; -import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; +import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; +import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler; +import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskController; import org.apache.hadoop.utils.db.DBCheckpoint; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; +import org.apache.hadoop.utils.db.RDBBatchOperation; +import org.apache.hadoop.utils.db.RDBStore; import org.apache.hadoop.utils.db.RocksDBCheckpoint; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.ratis.protocol.ClientId; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,11 +94,28 @@ public class OzoneManagerServiceProviderImpl private File omSnapshotDBParentDir = null; private String omDBSnapshotUrl; - @Inject + private OzoneManagerProtocol ozoneManagerClient; + private final ClientId clientId = ClientId.randomId(); + private final OzoneConfiguration configuration; + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(1); + private ReconOMMetadataManager omMetadataManager; + private ReconTaskController reconTaskController; + private ReconTaskStatusDao reconTaskStatusDao; + private ReconUtils reconUtils; + private enum OmSnapshotTaskName { + OM_DB_FULL_SNAPSHOT, + OM_DB_DELTA_UPDATES + } @Inject - public OzoneManagerServiceProviderImpl(Configuration configuration) { + public OzoneManagerServiceProviderImpl( + OzoneConfiguration configuration, + ReconOMMetadataManager omMetadataManager, + ReconTaskController reconTaskController, + ReconUtils reconUtils, + OzoneManagerProtocol ozoneManagerClient) throws IOException { String ozoneManagerHttpAddress = configuration.get(OMConfigKeys .OZONE_OM_HTTP_ADDRESS_KEY); @@ -87,7 +123,7 @@ public class OzoneManagerServiceProviderImpl String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys .OZONE_OM_HTTPS_ADDRESS_KEY); - omSnapshotDBParentDir = getReconDbDir(configuration, + omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration, OZONE_RECON_OM_SNAPSHOT_DB_DIR); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(configuration); @@ -127,17 +163,81 @@ public class OzoneManagerServiceProviderImpl omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true"; } + this.reconUtils = reconUtils; + this.omMetadataManager = omMetadataManager; + this.reconTaskController = reconTaskController; + this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao(); + this.ozoneManagerClient = ozoneManagerClient; + this.configuration = configuration; } @Override - public void init() throws IOException { - updateReconOmDBWithNewSnapshot(); + public OMMetadataManager getOMMetadataManagerInstance() { + return omMetadataManager; } @Override - public void updateReconOmDBWithNewSnapshot() throws IOException { - //Obtain the current DB snapshot from OM and - //update the in house OM metadata managed DB instance. + public void start() { + long initialDelay = configuration.getTimeDuration( + RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY, + RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT, + TimeUnit.MILLISECONDS); + long interval = configuration.getTimeDuration( + RECON_OM_SNAPSHOT_TASK_INTERVAL, + RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + scheduler.scheduleWithFixedDelay(this::syncDataFromOM, + initialDelay, + interval, + TimeUnit.MILLISECONDS); + } + + @Override + public void stop() { + reconTaskController.stop(); + scheduler.shutdownNow(); + } + + /** + * Method to obtain current OM DB Snapshot. + * @return DBCheckpoint instance. + */ + @VisibleForTesting + DBCheckpoint getOzoneManagerDBSnapshot() { + String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System + .currentTimeMillis(); + File targetFile = new File(omSnapshotDBParentDir, snapshotFileName + + ".tar.gz"); + try { + try (InputStream inputStream = reconUtils.makeHttpCall(httpClient, + omDBSnapshotUrl)) { + FileUtils.copyInputStreamToFile(inputStream, targetFile); + } + + // Untar the checkpoint file. + Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), + snapshotFileName); + reconUtils.untarCheckpointFile(targetFile, untarredDbDir); + FileUtils.deleteQuietly(targetFile); + + // TODO Create Checkpoint based on OM DB type. + // Currently, OM DB type is not configurable. Hence, defaulting to + // RocksDB. + return new RocksDBCheckpoint(untarredDbDir); + } catch (IOException e) { + LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e); + } + return null; + } + + /** + * Update Local OM DB with new OM DB snapshot. + * @throws IOException + */ + @VisibleForTesting + void updateReconOmDBWithNewSnapshot() throws IOException { + // Obtain the current DB snapshot from OM and + // update the in house OM metadata managed DB instance. DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot(); if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) { try { @@ -151,41 +251,97 @@ public class OzoneManagerServiceProviderImpl } } - @Override - public OMMetadataManager getOMMetadataManagerInstance() { - return omMetadataManager; + /** + * Get Delta updates from OM through RPC call and apply to local OM DB as + * well as accumulate in a buffer. + * @param fromSequenceNumber from sequence number to request from. + * @param omdbUpdatesHandler OM DB updates handler to buffer updates. + * @throws IOException when OM RPC request fails. + * @throws RocksDBException when writing to RocksDB fails. + */ + @VisibleForTesting + void getAndApplyDeltaUpdatesFromOM( + long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler) + throws IOException, RocksDBException { + DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder() + .setSequenceNumber(fromSequenceNumber).build(); + DBUpdatesWrapper dbUpdates = ozoneManagerClient.getDBUpdates( + dbUpdatesRequest); + if (null != dbUpdates) { + RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore(); + RocksDB rocksDB = rocksDBStore.getDb(); + LOG.debug("Number of updates received from OM : " + + dbUpdates.getData().size()); + for (byte[] data : dbUpdates.getData()) { + WriteBatch writeBatch = new WriteBatch(data); + writeBatch.iterate(omdbUpdatesHandler); + RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch); + rdbBatchOperation.commit(rocksDB, new WriteOptions()); + } + } } /** - * Method to obtain current OM DB Snapshot. - * @return DBCheckpoint instance. + * Based on current state of Recon's OM DB, we either get delta updates or + * full snapshot from Ozone Manager. */ @VisibleForTesting - protected DBCheckpoint getOzoneManagerDBSnapshot() { - String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System - .currentTimeMillis(); - File targetFile = new File(omSnapshotDBParentDir, snapshotFileName + - ".tar.gz"); - try { - try (InputStream inputStream = makeHttpCall(httpClient, - omDBSnapshotUrl)) { - FileUtils.copyInputStreamToFile(inputStream, targetFile); + void syncDataFromOM() { + long currentSequenceNumber = getCurrentOMDBSequenceNumber(); + boolean fullSnapshot = false; + + if (currentSequenceNumber <= 0) { + fullSnapshot = true; + } else { + OMDBUpdatesHandler omdbUpdatesHandler = + new OMDBUpdatesHandler(omMetadataManager); + try { + // Get updates from OM and apply to local Recon OM DB. + getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, + omdbUpdatesHandler); + // Update timestamp of successful delta updates query. + ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus( + OmSnapshotTaskName.OM_DB_DELTA_UPDATES.name(), + System.currentTimeMillis(), getCurrentOMDBSequenceNumber()); + reconTaskStatusDao.update(reconTaskStatusRecord); + // Pass on DB update events to tasks that are listening. + reconTaskController.consumeOMEvents(new OMUpdateEventBatch( + omdbUpdatesHandler.getEvents()), omMetadataManager); + } catch (IOException | InterruptedException | RocksDBException e) { + LOG.warn("Unable to get and apply delta updates from OM.", e); + fullSnapshot = true; } - - //Untar the checkpoint file. - Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(), - snapshotFileName); - untarCheckpointFile(targetFile, untarredDbDir); - FileUtils.deleteQuietly(targetFile); - - //TODO Create Checkpoint based on OM DB type. - // Currently, OM DB type is not configurable. Hence, defaulting to - // RocksDB. - return new RocksDBCheckpoint(untarredDbDir); - } catch (IOException e) { - LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e); } - return null; + + if (fullSnapshot) { + try { + // Update local Recon OM DB to new snapshot. + updateReconOmDBWithNewSnapshot(); + // Update timestamp of successful delta updates query. + ReconTaskStatus reconTaskStatusRecord = + new ReconTaskStatus( + OmSnapshotTaskName.OM_DB_FULL_SNAPSHOT.name(), + System.currentTimeMillis(), getCurrentOMDBSequenceNumber()); + reconTaskStatusDao.update(reconTaskStatusRecord); + // Reinitialize tasks that are listening. + reconTaskController.reInitializeTasks(omMetadataManager); + } catch (IOException | InterruptedException e) { + LOG.error("Unable to update Recon's OM DB with new snapshot ", e); + } + } + } + + /** + * Get OM RocksDB's latest sequence number. + * @return latest sequence number. + */ + private long getCurrentOMDBSequenceNumber() { + RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore(); + if (null == rocksDBStore) { + return 0; + } else { + return rocksDBStore.getDb().getLatestSequenceNumber(); + } } } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java index de5c030c83..9b99f70984 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java @@ -22,11 +22,11 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_COUNT_T import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB; import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; -import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir; import java.nio.file.Path; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.utils.db.DBStore; import org.apache.hadoop.utils.db.DBStoreBuilder; @@ -52,9 +52,12 @@ public class ReconContainerDBProvider implements Provider { @Inject private OzoneConfiguration configuration; + @Inject + private ReconUtils reconUtils; + @Override public DBStore get() { - DBStore dbStore = getNewDBStore(configuration); + DBStore dbStore = getNewDBStore(configuration, reconUtils); if (dbStore == null) { throw new ProvisionException("Unable to provide instance of DBStore " + "store."); @@ -62,11 +65,13 @@ public class ReconContainerDBProvider implements Provider { return dbStore; } - public static DBStore getNewDBStore(OzoneConfiguration configuration) { + public static DBStore getNewDBStore(OzoneConfiguration configuration, + ReconUtils reconUtils) { DBStore dbStore = null; String dbName = RECON_CONTAINER_DB + "_" + System.currentTimeMillis(); try { - Path metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR).toPath(); + Path metaDir = reconUtils.getReconDbDir( + configuration, OZONE_RECON_DB_DIR).toPath(); dbStore = DBStoreBuilder.newBuilder(configuration) .setPath(metaDir) .setName(dbName) diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java index ec8695cbc7..18a1e384cd 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java @@ -18,11 +18,13 @@ package org.apache.hadoop.ozone.recon.tasks; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; + import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -41,28 +43,23 @@ import org.apache.hadoop.utils.db.TableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.inject.Inject; + /** * Class to iterate over the OM DB and populate the Recon container DB with * the container -> Key reverse mapping. */ -public class ContainerKeyMapperTask extends ReconDBUpdateTask { +public class ContainerKeyMapperTask implements ReconDBUpdateTask { private static final Logger LOG = LoggerFactory.getLogger(ContainerKeyMapperTask.class); private ContainerDBServiceProvider containerDBServiceProvider; - private Collection tables = new ArrayList<>(); + @Inject public ContainerKeyMapperTask(ContainerDBServiceProvider - containerDBServiceProvider, - OMMetadataManager omMetadataManager) { - super("ContainerKeyMapperTask"); + containerDBServiceProvider) { this.containerDBServiceProvider = containerDBServiceProvider; - try { - tables.add(omMetadataManager.getKeyTable().getName()); - } catch (IOException ioEx) { - LOG.error("Unable to listen on Key Table updates ", ioEx); - } } /** @@ -103,13 +100,19 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask { } @Override - protected Collection getTaskTables() { - return tables; + public String getTaskName() { + return "ContainerKeyMapperTask"; } @Override - Pair process(OMUpdateEventBatch events) { + public Collection getTaskTables() { + return Collections.singletonList(KEY_TABLE); + } + + @Override + public Pair process(OMUpdateEventBatch events) { Iterator eventIterator = events.getIterator(); + int eventCount = 0; while (eventIterator.hasNext()) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); String updatedKey = omdbUpdateEvent.getKey(); @@ -127,12 +130,15 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask { default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent .getAction()); } + eventCount++; } catch (IOException e) { LOG.error("Unexpected exception while updating key data : {} ", updatedKey, e); return new ImmutablePair<>(getTaskName(), false); } } + LOG.info("{} successfully processed {} OM DB update event(s).", + getTaskName(), eventCount); return new ImmutablePair<>(getTaskName(), true); } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java index a09eaff798..7432392a51 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java @@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; import static org.apache.hadoop.ozone.recon.tasks. OMDBUpdateEvent.OMDBUpdateAction.DELETE; import static org.apache.hadoop.ozone.recon.tasks. @@ -48,7 +49,7 @@ import static org.apache.hadoop.ozone.recon.tasks. * files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon * fileSize DB. */ -public class FileSizeCountTask extends ReconDBUpdateTask { +public class FileSizeCountTask implements ReconDBUpdateTask { private static final Logger LOG = LoggerFactory.getLogger(FileSizeCountTask.class); @@ -56,19 +57,11 @@ public class FileSizeCountTask extends ReconDBUpdateTask { private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB private long[] upperBoundCount; private long oneKb = 1024L; - private Collection tables = new ArrayList<>(); private FileCountBySizeDao fileCountBySizeDao; @Inject - public FileSizeCountTask(OMMetadataManager omMetadataManager, - Configuration sqlConfiguration) { - super("FileSizeCountTask"); - try { - tables.add(omMetadataManager.getKeyTable().getName()); - fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration); - } catch (Exception e) { - LOG.error("Unable to fetch Key Table updates ", e); - } + public FileSizeCountTask(Configuration sqlConfiguration) { + fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration); upperBoundCount = new long[getMaxBinSize()]; } @@ -98,7 +91,6 @@ public class FileSizeCountTask extends ReconDBUpdateTask { */ @Override public Pair reprocess(OMMetadataManager omMetadataManager) { - LOG.info("Starting a 'reprocess' run of FileSizeCountTask."); Table omKeyInfoTable = omMetadataManager.getKeyTable(); try (TableIterator> keyIter = omKeyInfoTable.iterator()) { @@ -119,8 +111,13 @@ public class FileSizeCountTask extends ReconDBUpdateTask { } @Override - protected Collection getTaskTables() { - return tables; + public String getTaskName() { + return "FileSizeCountTask"; + } + + @Override + public Collection getTaskTables() { + return Collections.singletonList(KEY_TABLE); } private void updateCountFromDB() { @@ -144,8 +141,7 @@ public class FileSizeCountTask extends ReconDBUpdateTask { * @return Pair */ @Override - Pair process(OMUpdateEventBatch events) { - LOG.info("Starting a 'process' run of FileSizeCountTask."); + public Pair process(OMUpdateEventBatch events) { Iterator eventIterator = events.getIterator(); //update array with file size count from DB @@ -246,9 +242,9 @@ public class FileSizeCountTask extends ReconDBUpdateTask { //decrement only if it had files before, default DB value is 0 upperBoundCount[binIndex]--; } else { - LOG.debug("Cannot decrement count. Default value is 0 (zero)."); - throw new IOException("Cannot decrement count. " - + "Default value is 0 (zero)."); + LOG.warn("Unexpected error while updating bin count. Found 0 count " + + "for index : " + binIndex + " while processing DELETE event for " + + omKeyInfo.getKeyName()); } } } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java index 82b7a35912..0fcabccb37 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java @@ -30,18 +30,18 @@ public final class OMDBUpdateEvent { private final String table; private final KEY updatedKey; private final VALUE updatedValue; - private final EventInfo eventInfo; + private final long sequenceNumber; private OMDBUpdateEvent(OMDBUpdateAction action, String table, KEY updatedKey, VALUE updatedValue, - EventInfo eventInfo) { + long sequenceNumber) { this.action = action; this.table = table; this.updatedKey = updatedKey; this.updatedValue = updatedValue; - this.eventInfo = eventInfo; + this.sequenceNumber = sequenceNumber; } public OMDBUpdateAction getAction() { @@ -60,8 +60,8 @@ public final class OMDBUpdateEvent { return updatedValue; } - public EventInfo getEventInfo() { - return eventInfo; + public long getSequenceNumber() { + return sequenceNumber; } /** @@ -75,7 +75,7 @@ public final class OMDBUpdateEvent { private String table; private KEY updatedKey; private VALUE updatedValue; - private EventInfo eventInfo; + private long lastSequenceNumber; OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) { this.action = omdbUpdateAction; @@ -97,10 +97,8 @@ public final class OMDBUpdateEvent { return this; } - OMUpdateEventBuilder setEventInfo(long sequenceNumber, - long eventTimestampMillis) { - this.eventInfo = new EventInfo(sequenceNumber, - eventTimestampMillis); + OMUpdateEventBuilder setSequenceNumber(long sequenceNumber) { + this.lastSequenceNumber = sequenceNumber; return this; } @@ -114,30 +112,7 @@ public final class OMDBUpdateEvent { table, updatedKey, updatedValue, - eventInfo); - } - } - - /** - * Class used to hold timing information for an event. (Seq number and - * timestamp) - */ - public static class EventInfo { - private long sequenceNumber; - private long eventTimestampMillis; - - public EventInfo(long sequenceNumber, - long eventTimestampMillis) { - this.sequenceNumber = sequenceNumber; - this.eventTimestampMillis = eventTimestampMillis; - } - - public long getSequenceNumber() { - return sequenceNumber; - } - - public long getEventTimestampMillis() { - return eventTimestampMillis; + lastSequenceNumber); } } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java index d2d11b2865..00bbadeb84 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java @@ -78,6 +78,11 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler { /** * + * @param cfIndex + * @param keyBytes + * @param valueBytes + * @param action + * @throws IOException */ private void processEvent(int cfIndex, byte[] keyBytes, byte[] valueBytes, OMDBUpdateEvent.OMDBUpdateAction action) @@ -100,8 +105,8 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler { builder.setAction(action); OMDBUpdateEvent event = builder.build(); - LOG.info("Generated OM update Event for table : " + event.getTable() - + ", Key = " + event.getKey()); + LOG.debug("Generated OM update Event for table : " + event.getTable() + + ", Key = " + event.getKey() + ", action = " + event.getAction()); // Temporarily adding to an event buffer for testing. In subsequent JIRAs, // a Recon side class will be implemented that requests delta updates // from OM and calls on this handler. In that case, we will fill up diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java index 3b7cc5bcf2..f137418976 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java @@ -31,7 +31,7 @@ public class OMUpdateEventBatch { private List events; - OMUpdateEventBatch(Collection e) { + public OMUpdateEventBatch(Collection e) { events = new ArrayList<>(e); } @@ -39,11 +39,11 @@ public class OMUpdateEventBatch { * Get Sequence Number and timestamp of last event in this batch. * @return Event Info instance. */ - OMDBUpdateEvent.EventInfo getLastEventInfo() { + long getLastSequenceNumber() { if (events.isEmpty()) { - return new OMDBUpdateEvent.EventInfo(-1, -1); + return -1; } else { - return events.get(events.size() - 1).getEventInfo(); + return events.get(events.size() - 1).getSequenceNumber(); } } @@ -66,4 +66,12 @@ public class OMUpdateEventBatch { .filter(e -> tables.contains(e.getTable())) .collect(Collectors.toList())); } + + /** + * Return if empty. + * @return true if empty, else false. + */ + public boolean isEmpty() { + return !getIterator().hasNext(); + } } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java index d828577af6..426e0ae0a3 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java @@ -24,43 +24,35 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.ozone.om.OMMetadataManager; /** - * Abstract class used to denote a Recon task that needs to act on OM DB events. + * Interface used to denote a Recon task that needs to act on OM DB events. */ -public abstract class ReconDBUpdateTask { - - private String taskName; - - protected ReconDBUpdateTask(String taskName) { - this.taskName = taskName; - } +public interface ReconDBUpdateTask { /** * Return task name. * @return task name */ - public String getTaskName() { - return taskName; - } + String getTaskName(); /** * Return the list of tables that the task is listening on. * Empty list means the task is NOT listening on any tables. * @return Collection of Tables. */ - protected abstract Collection getTaskTables(); + Collection getTaskTables(); /** * Process a set of OM events on tables that the task is listening on. * @param events Set of events to be processed by the task. * @return Pair of task name -> task success. */ - abstract Pair process(OMUpdateEventBatch events); + Pair process(OMUpdateEventBatch events); /** * Process a on tables that the task is listening on. * @param omMetadataManager OM Metadata manager instance. * @return Pair of task name -> task success. */ - abstract Pair reprocess(OMMetadataManager omMetadataManager); + Pair reprocess(OMMetadataManager omMetadataManager); } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java index 7548cc9116..728a199f5f 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.recon.tasks; import java.util.Map; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; + /** * Controller used by Recon to manage Tasks that are waiting on Recon events. */ @@ -36,11 +39,31 @@ public interface ReconTaskController { * @param events set of events * @throws InterruptedException InterruptedException */ - void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException; + void consumeOMEvents(OMUpdateEventBatch events, + OMMetadataManager omMetadataManager) + throws InterruptedException; + + /** + * Pass on the handle to a new OM DB instance to the registered tasks. + * @param omMetadataManager OM Metadata Manager instance + */ + void reInitializeTasks(OMMetadataManager omMetadataManager) + throws InterruptedException; /** * Get set of registered tasks. * @return Map of Task name -> Task. */ Map getRegisteredTasks(); + + /** + * Get instance of ReconTaskStatusDao. + * @return instance of ReconTaskStatusDao + */ + ReconTaskStatusDao getReconTaskStatusDao(); + + /** + * Stop the tasks. Start API is not needed since it is implicit. + */ + void stop(); } diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index 3fd7d966eb..9135705cca 100644 --- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; import org.jooq.Configuration; @@ -57,21 +58,22 @@ public class ReconTaskControllerImpl implements ReconTaskController { private ExecutorService executorService; private int threadCount = 1; private final Semaphore taskSemaphore = new Semaphore(1); - private final ReconOMMetadataManager omMetadataManager; private Map taskFailureCounter = new HashMap<>(); private static final int TASK_FAILURE_THRESHOLD = 2; private ReconTaskStatusDao reconTaskStatusDao; @Inject public ReconTaskControllerImpl(OzoneConfiguration configuration, - ReconOMMetadataManager omMetadataManager, - Configuration sqlConfiguration) { - this.omMetadataManager = omMetadataManager; + Configuration sqlConfiguration, + Set tasks) { reconDBUpdateTasks = new HashMap<>(); threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY, OZONE_RECON_TASK_THREAD_COUNT_DEFAULT); executorService = Executors.newFixedThreadPool(threadCount); reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration); + for (ReconDBUpdateTask task : tasks) { + registerTask(task); + } } @Override @@ -86,7 +88,9 @@ public class ReconTaskControllerImpl implements ReconTaskController { // Create DB record for the task. ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName, 0L, 0L); - reconTaskStatusDao.insert(reconTaskStatusRecord); + if (!reconTaskStatusDao.existsById(taskName)) { + reconTaskStatusDao.insert(reconTaskStatusRecord); + } } /** @@ -98,7 +102,69 @@ public class ReconTaskControllerImpl implements ReconTaskController { * @throws InterruptedException */ @Override - public void consumeOMEvents(OMUpdateEventBatch events) + public void consumeOMEvents(OMUpdateEventBatch events, + OMMetadataManager omMetadataManager) + throws InterruptedException { + taskSemaphore.acquire(); + + try { + if (!events.isEmpty()) { + Collection> tasks = new ArrayList<>(); + for (Map.Entry taskEntry : + reconDBUpdateTasks.entrySet()) { + ReconDBUpdateTask task = taskEntry.getValue(); + Collection tables = task.getTaskTables(); + tasks.add(() -> task.process(events.filter(tables))); + } + + List> results = executorService.invokeAll(tasks); + List failedTasks = processTaskResults(results, events); + + // Retry + List retryFailedTasks = new ArrayList<>(); + if (!failedTasks.isEmpty()) { + tasks.clear(); + for (String taskName : failedTasks) { + ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); + Collection tables = task.getTaskTables(); + tasks.add(() -> task.process(events.filter(tables))); + } + results = executorService.invokeAll(tasks); + retryFailedTasks = processTaskResults(results, events); + } + + // Reprocess the failed tasks. + // TODO Move to a separate task queue since reprocess may be a heavy + // operation for large OM DB instances + if (!retryFailedTasks.isEmpty()) { + tasks.clear(); + for (String taskName : failedTasks) { + ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); + tasks.add(() -> task.reprocess(omMetadataManager)); + } + results = executorService.invokeAll(tasks); + List reprocessFailedTasks = + processTaskResults(results, events); + for (String taskName : reprocessFailedTasks) { + LOG.info("Reprocess step failed for task : " + taskName); + if (taskFailureCounter.get(taskName).incrementAndGet() > + TASK_FAILURE_THRESHOLD) { + LOG.info("Blacklisting Task since it failed retry and " + + "reprocess more than " + TASK_FAILURE_THRESHOLD + " times."); + reconDBUpdateTasks.remove(taskName); + } + } + } + } + } catch (ExecutionException e) { + LOG.error("Unexpected error : ", e); + } finally { + taskSemaphore.release(); + } + } + + @Override + public void reInitializeTasks(OMMetadataManager omMetadataManager) throws InterruptedException { taskSemaphore.acquire(); @@ -107,43 +173,14 @@ public class ReconTaskControllerImpl implements ReconTaskController { for (Map.Entry taskEntry : reconDBUpdateTasks.entrySet()) { ReconDBUpdateTask task = taskEntry.getValue(); - tasks.add(() -> task.process(events)); + tasks.add(() -> task.reprocess(omMetadataManager)); } List> results = executorService.invokeAll(tasks); - List failedTasks = processTaskResults(results, events); - - //Retry - List retryFailedTasks = new ArrayList<>(); - if (!failedTasks.isEmpty()) { - tasks.clear(); - for (String taskName : failedTasks) { - ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); - tasks.add(() -> task.process(events)); - } - results = executorService.invokeAll(tasks); - retryFailedTasks = processTaskResults(results, events); - } - - //Reprocess - //TODO Move to a separate task queue since reprocess may be a heavy - //operation for large OM DB instances - if (!retryFailedTasks.isEmpty()) { - tasks.clear(); - for (String taskName : failedTasks) { - ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName); - tasks.add(() -> task.reprocess(omMetadataManager)); - } - results = executorService.invokeAll(tasks); - List reprocessFailedTasks = processTaskResults(results, events); - for (String taskName : reprocessFailedTasks) { - LOG.info("Reprocess step failed for task : " + taskName); - if (taskFailureCounter.get(taskName).incrementAndGet() > - TASK_FAILURE_THRESHOLD) { - LOG.info("Blacklisting Task since it failed retry and " + - "reprocess more than " + TASK_FAILURE_THRESHOLD + " times."); - reconDBUpdateTasks.remove(taskName); - } + for (Future f : results) { + String taskName = f.get().getLeft().toString(); + if (!(Boolean)f.get().getRight()) { + LOG.info("Init failed for task : " + taskName); } } } catch (ExecutionException e) { @@ -157,12 +194,12 @@ public class ReconTaskControllerImpl implements ReconTaskController { * Store the last completed event sequence number and timestamp to the DB * for that task. * @param taskName taskname to be updated. - * @param eventInfo contains the new sequence number and timestamp. + * @param lastSequenceNumber contains the new sequence number. */ private void storeLastCompletedTransaction( - String taskName, OMDBUpdateEvent.EventInfo eventInfo) { + String taskName, long lastSequenceNumber) { ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName, - eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber()); + System.currentTimeMillis(), lastSequenceNumber); reconTaskStatusDao.update(reconTaskStatusRecord); } @@ -171,6 +208,16 @@ public class ReconTaskControllerImpl implements ReconTaskController { return reconDBUpdateTasks; } + @Override + public ReconTaskStatusDao getReconTaskStatusDao() { + return reconTaskStatusDao; + } + + @Override + public void stop() { + this.executorService.shutdownNow(); + } + /** * Wait on results of all tasks. * @param results Set of Futures. @@ -190,7 +237,7 @@ public class ReconTaskControllerImpl implements ReconTaskController { failedTasks.add(f.get().getLeft().toString()); } else { taskFailureCounter.get(taskName).set(0); - storeLastCompletedTransaction(taskName, events.getLastEventInfo()); + storeLastCompletedTransaction(taskName, events.getLastSequenceNumber()); } } return failedTasks; diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java index 7dc987d0e1..fe2cf491c4 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java @@ -56,7 +56,7 @@ public abstract class AbstractOMMetadataManagerTest { public TemporaryFolder temporaryFolder = new TemporaryFolder(); /** - * Create a new OM Metadata manager instance. + * Create a new OM Metadata manager instance with default volume and bucket. * @throws IOException ioEx */ protected OMMetadataManager initializeNewOmMetadataManager() @@ -87,6 +87,19 @@ public abstract class AbstractOMMetadataManagerTest { return omMetadataManager; } + /** + * Create an empty OM Metadata manager instance. + * @throws IOException ioEx + */ + protected OMMetadataManager initializeEmptyOmMetadataManager() + throws IOException { + File omDbDir = temporaryFolder.newFolder(); + OzoneConfiguration omConfiguration = new OzoneConfiguration(); + omConfiguration.set(OZONE_OM_DB_DIRS, + omDbDir.getAbsolutePath()); + return new OmMetadataManagerImpl(omConfiguration); + } + /** * Get an instance of Recon OM Metadata manager. * @return ReconOMMetadataManager diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java index f531bb20db..ad04837421 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java @@ -61,7 +61,7 @@ public class TestReconUtils { OzoneConfiguration configuration = new OzoneConfiguration(); configuration.set("TEST_DB_DIR", filePath); - File file = ReconUtils.getReconDbDir(configuration, + File file = new ReconUtils().getReconDbDir(configuration, "TEST_DB_DIR"); Assert.assertEquals(filePath, file.getAbsolutePath()); } @@ -89,7 +89,7 @@ public class TestReconUtils { //Create test tar file. File tarFile = OmUtils.createTarFile(newDir.toPath()); File outputDir = folder.newFolder(); - ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath()); + new ReconUtils().untarCheckpointFile(tarFile, outputDir.toPath()); assertTrue(outputDir.isDirectory()); assertTrue(outputDir.listFiles().length == 2); @@ -126,7 +126,8 @@ public class TestReconUtils { } }); - InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url); + InputStream inputStream = new ReconUtils() + .makeHttpCall(httpClientMock, url); String contents = IOUtils.toString(inputStream, Charset.defaultCharset()); assertEquals("File 1 Contents", contents); diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java index 3ae39a6354..3bef4a0a27 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java @@ -20,10 +20,9 @@ package org.apache.hadoop.ozone.recon.api; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -36,15 +35,12 @@ import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest; import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl; -import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.ContainersResponse; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata; @@ -53,53 +49,33 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; -import org.apache.hadoop.utils.db.DBCheckpoint; -import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.hadoop.utils.db.Table; import org.hadoop.ozone.recon.schema.StatsSchemaDefinition; import org.jooq.impl.DSL; import org.jooq.impl.DefaultConfiguration; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.inject.AbstractModule; import com.google.inject.Injector; -import org.junit.rules.TemporaryFolder; - /** * Test for container key service. */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"}) -@PrepareForTest(ReconUtils.class) public class TestContainerKeyService extends AbstractOMMetadataManagerTest { - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); private ContainerDBServiceProvider containerDbServiceProvider; - private OMMetadataManager omMetadataManager; private Injector injector; private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider; private ContainerKeyService containerKeyService; private GuiceInjectorUtilsForTestsImpl guiceInjectorTest = new GuiceInjectorUtilsForTestsImpl(); private boolean isSetupDone = false; - + private ReconOMMetadataManager reconOMMetadataManager; private void initializeInjector() throws Exception { - omMetadataManager = initializeNewOmMetadataManager(); - OzoneConfiguration configuration = - guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder); - - ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl( - configuration); - ReconOMMetadataManager reconOMMetadataManager = - getTestMetadataManager(omMetadataManager); + reconOMMetadataManager = getTestMetadataManager( + initializeNewOmMetadataManager()); + ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider(); Injector parentInjector = guiceInjectorTest.getInjector( ozoneManagerServiceProvider, reconOMMetadataManager, temporaryFolder); @@ -150,7 +126,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest { OmKeyLocationInfoGroup(0, omKeyLocationInfoList); //key = key_one, Blocks = [ {CID = 1, LID = 101}, {CID = 2, LID = 102} ] - writeDataToOm(omMetadataManager, + writeDataToOm(reconOMMetadataManager, "key_one", "bucketOne", "sampleVol", Collections.singletonList(omKeyLocationInfoGroup)); @@ -174,7 +150,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest { omKeyLocationInfoListNew)); //key = key_two, Blocks = [ {CID = 1, LID = 103}, {CID = 1, LID = 104} ] - writeDataToOm(omMetadataManager, + writeDataToOm(reconOMMetadataManager, "key_two", "bucketOne", "sampleVol", infoGroups); List omKeyLocationInfoList2 = new ArrayList<>(); @@ -192,27 +168,18 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest { OmKeyLocationInfoGroup(0, omKeyLocationInfoList2); //key = key_three, Blocks = [ {CID = 2, LID = 2}, {CID = 2, LID = 3} ] - writeDataToOm(omMetadataManager, + writeDataToOm(reconOMMetadataManager, "key_three", "bucketOne", "sampleVol", Collections.singletonList(omKeyLocationInfoGroup2)); - //Take snapshot of OM DB and copy over to Recon OM DB. - DBCheckpoint checkpoint = omMetadataManager.getStore() - .getCheckpoint(true); - File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation()); - InputStream inputStream = new FileInputStream(tarFile); - PowerMockito.stub(PowerMockito.method(ReconUtils.class, - "makeHttpCall", - CloseableHttpClient.class, String.class)) - .toReturn(inputStream); - //Generate Recon container DB data. - ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask( - containerDbServiceProvider, - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); - containerKeyMapperTask.reprocess(ozoneManagerServiceProvider - .getOMMetadataManagerInstance()); + OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class); + Table tableMock = mock(Table.class); + when(tableMock.getName()).thenReturn("KeyTable"); + when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock); + ContainerKeyMapperTask containerKeyMapperTask = + new ContainerKeyMapperTask(containerDbServiceProvider); + containerKeyMapperTask.reprocess(reconOMMetadataManager); } @Test @@ -397,4 +364,10 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest { assertEquals(2, containers.size()); assertEquals(2, data.getTotalCount()); } + + private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider() { + OzoneManagerServiceProviderImpl omServiceProviderMock = + mock(OzoneManagerServiceProviderImpl.class); + return omServiceProviderMock; + } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java index a5c726392c..a3265b82a6 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java @@ -18,35 +18,25 @@ package org.apache.hadoop.ozone.recon.api; -import org.apache.hadoop.ozone.recon.ReconUtils; import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test for File size count service. */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"}) -@PrepareForTest(ReconUtils.class) public class TestUtilizationService { private UtilizationService utilizationService; - @Mock private FileCountBySizeDao fileCountBySizeDao; private int maxBinSize = 42; private List setUpResultList() { @@ -68,6 +58,7 @@ public class TestUtilizationService { public void testGetFileCounts() { List resultList = setUpResultList(); + FileCountBySizeDao fileCountBySizeDao = mock(FileCountBySizeDao.class); utilizationService = mock(UtilizationService.class); when(utilizationService.getFileCounts()).thenCallRealMethod(); when(utilizationService.getDao()).thenReturn(fileCountBySizeDao); diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index fde81426a4..c2a6dd8500 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -18,104 +18,108 @@ package org.apache.hadoop.ozone.recon.spi.impl; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileWriter; +import java.io.IOException; import java.io.InputStream; import java.nio.file.Paths; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest; -import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl; import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler; +import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch; +import org.apache.hadoop.ozone.recon.tasks.ReconTaskController; import org.apache.hadoop.utils.db.DBCheckpoint; -import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; +import org.apache.hadoop.utils.db.RDBStore; +import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; +import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import com.google.inject.Injector; +import org.mockito.ArgumentCaptor; +import org.rocksdb.RocksDB; +import org.rocksdb.TransactionLogIterator; +import org.rocksdb.WriteBatch; /** * Class to test Ozone Manager Service Provider Implementation. */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"}) -@PrepareForTest(ReconUtils.class) public class TestOzoneManagerServiceProviderImpl extends AbstractOMMetadataManagerTest { - private OMMetadataManager omMetadataManager; - private ReconOMMetadataManager reconOMMetadataManager; - private Injector injector; - private GuiceInjectorUtilsForTestsImpl guiceInjectorTest = - new GuiceInjectorUtilsForTestsImpl(); - private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider; - private boolean isSetupDone = false; - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private OzoneConfiguration configuration; + private OzoneManagerProtocol ozoneManagerProtocol; @Before public void setUp() throws Exception { - omMetadataManager = initializeNewOmMetadataManager(); - writeDataToOm(omMetadataManager, "key_one"); - reconOMMetadataManager = getTestMetadataManager(omMetadataManager); - ozoneManagerServiceProvider = - new OzoneManagerServiceProviderImpl( - guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder)); - if (!isSetupDone) { - injector = guiceInjectorTest.getInjector(ozoneManagerServiceProvider, - reconOMMetadataManager, temporaryFolder); - - isSetupDone = true; - } + configuration = new OzoneConfiguration(); + configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, + temporaryFolder.newFolder().getAbsolutePath()); + configuration.set(OZONE_RECON_DB_DIR, + temporaryFolder.newFolder().getAbsolutePath()); + configuration.set("ozone.om.address", "localhost:9862"); + ozoneManagerProtocol = getMockOzoneManagerClient(new DBUpdatesWrapper()); } @Test - public void testInit() throws Exception { + public void testUpdateReconOmDBWithNewSnapshot() throws Exception { - Assert.assertNotNull(reconOMMetadataManager.getKeyTable() - .get("/sampleVol/bucketOne/key_one")); - Assert.assertNull(reconOMMetadataManager.getKeyTable() - .get("/sampleVol/bucketOne/key_two")); + OMMetadataManager omMetadataManager = initializeNewOmMetadataManager(); + ReconOMMetadataManager reconOMMetadataManager = + getTestMetadataManager(omMetadataManager); + writeDataToOm(omMetadataManager, "key_one"); writeDataToOm(omMetadataManager, "key_two"); + DBCheckpoint checkpoint = omMetadataManager.getStore() .getCheckpoint(true); File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation()); InputStream inputStream = new FileInputStream(tarFile); - PowerMockito.stub(PowerMockito.method(ReconUtils.class, - "makeHttpCall", - CloseableHttpClient.class, String.class)) - .toReturn(inputStream); + ReconUtils reconUtilsMock = getMockReconUtils(); + when(reconUtilsMock.makeHttpCall(any(), anyString())) + .thenReturn(inputStream); - ozoneManagerServiceProvider.init(); + ReconTaskController reconTaskController = getMockTaskController(); - Assert.assertNotNull(reconOMMetadataManager.getKeyTable() + OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = + new OzoneManagerServiceProviderImpl(configuration, + reconOMMetadataManager, reconTaskController, reconUtilsMock, + ozoneManagerProtocol); + + Assert.assertNull(reconOMMetadataManager.getKeyTable() .get("/sampleVol/bucketOne/key_one")); - Assert.assertNotNull(reconOMMetadataManager.getKeyTable() + Assert.assertNull(reconOMMetadataManager.getKeyTable() .get("/sampleVol/bucketOne/key_two")); - } - @Test - public void testGetOMMetadataManagerInstance() throws Exception { - OMMetadataManager omMetaMgr = ozoneManagerServiceProvider - .getOMMetadataManagerInstance(); - assertNotNull(omMetaMgr); + ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot(); + + assertNotNull(reconOMMetadataManager.getKeyTable() + .get("/sampleVol/bucketOne/key_one")); + assertNotNull(reconOMMetadataManager.getKeyTable() + .get("/sampleVol/bucketOne/key_two")); } @Test @@ -144,12 +148,18 @@ public class TestOzoneManagerServiceProviderImpl extends //Create test tar file. File tarFile = OmUtils.createTarFile(checkpointDir.toPath()); - InputStream fileInputStream = new FileInputStream(tarFile); - PowerMockito.stub(PowerMockito.method(ReconUtils.class, - "makeHttpCall", - CloseableHttpClient.class, String.class)) - .toReturn(fileInputStream); + ReconUtils reconUtilsMock = getMockReconUtils(); + when(reconUtilsMock.makeHttpCall(any(), anyString())) + .thenReturn(fileInputStream); + + ReconOMMetadataManager reconOMMetadataManager = + mock(ReconOMMetadataManager.class); + ReconTaskController reconTaskController = getMockTaskController(); + OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = + new OzoneManagerServiceProviderImpl(configuration, + reconOMMetadataManager, reconTaskController, reconUtilsMock, + ozoneManagerProtocol); DBCheckpoint checkpoint = ozoneManagerServiceProvider .getOzoneManagerDBSnapshot(); @@ -158,4 +168,150 @@ public class TestOzoneManagerServiceProviderImpl extends assertTrue(checkpoint.getCheckpointLocation().toFile() .listFiles().length == 2); } + + @Test + public void testGetAndApplyDeltaUpdatesFromOM() throws Exception { + + // Writing 2 Keys into a source OM DB and collecting it in a + // DBUpdatesWrapper. + OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager(); + writeDataToOm(sourceOMMetadataMgr, "key_one"); + writeDataToOm(sourceOMMetadataMgr, "key_two"); + + RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb(); + TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L); + DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper(); + while(transactionLogIterator.isValid()) { + TransactionLogIterator.BatchResult result = + transactionLogIterator.getBatch(); + result.writeBatch().markWalTerminationPoint(); + WriteBatch writeBatch = result.writeBatch(); + dbUpdatesWrapper.addWriteBatch(writeBatch.data(), + result.sequenceNumber()); + transactionLogIterator.next(); + } + + // OM Service Provider's Metadata Manager. + OMMetadataManager omMetadataManager = initializeNewOmMetadataManager(); + + OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = + new OzoneManagerServiceProviderImpl(configuration, + getTestMetadataManager(omMetadataManager), + getMockTaskController(), new ReconUtils(), + getMockOzoneManagerClient(dbUpdatesWrapper)); + + OMDBUpdatesHandler updatesHandler = + new OMDBUpdatesHandler(omMetadataManager); + ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM( + 0L, updatesHandler); + + // In this method, we have to assert the "GET" part and the "APPLY" path. + + // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4 + // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs). + assertEquals(4, updatesHandler.getEvents().size()); + + // Assert APPLY path --> Verify if the OM service provider's RocksDB got + // the changes. + String fullKey = omMetadataManager.getOzoneKey("sampleVol", + "bucketOne", "key_one"); + assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance() + .getKeyTable().isExist(fullKey)); + fullKey = omMetadataManager.getOzoneKey("sampleVol", + "bucketOne", "key_two"); + assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance() + .getKeyTable().isExist(fullKey)); + } + + @Test + public void testSyncDataFromOMFullSnapshot() throws Exception { + + // Empty OM DB to start with. + ReconOMMetadataManager omMetadataManager = getTestMetadataManager( + initializeEmptyOmMetadataManager()); + ReconTaskStatusDao reconTaskStatusDaoMock = + mock(ReconTaskStatusDao.class); + doNothing().when(reconTaskStatusDaoMock) + .update(any(ReconTaskStatus.class)); + + ReconTaskController reconTaskControllerMock = getMockTaskController(); + when(reconTaskControllerMock.getReconTaskStatusDao()) + .thenReturn(reconTaskStatusDaoMock); + doNothing().when(reconTaskControllerMock) + .reInitializeTasks(omMetadataManager); + + OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = + new OzoneManagerServiceProviderImpl(configuration, omMetadataManager, + reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol); + + //Should trigger full snapshot request. + ozoneManagerServiceProvider.syncDataFromOM(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(ReconTaskStatus.class); + verify(reconTaskStatusDaoMock, times(1)) + .update(captor.capture()); + assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT")); + verify(reconTaskControllerMock, times(1)) + .reInitializeTasks(omMetadataManager); + } + + @Test + public void testSyncDataFromOMDeltaUpdates() throws Exception { + + // Non-Empty OM DB to start with. + ReconOMMetadataManager omMetadataManager = getTestMetadataManager( + initializeNewOmMetadataManager()); + ReconTaskStatusDao reconTaskStatusDaoMock = + mock(ReconTaskStatusDao.class); + doNothing().when(reconTaskStatusDaoMock) + .update(any(ReconTaskStatus.class)); + + ReconTaskController reconTaskControllerMock = getMockTaskController(); + when(reconTaskControllerMock.getReconTaskStatusDao()) + .thenReturn(reconTaskStatusDaoMock); + doNothing().when(reconTaskControllerMock) + .consumeOMEvents(any(OMUpdateEventBatch.class), + any(OMMetadataManager.class)); + + OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = + new OzoneManagerServiceProviderImpl(configuration, omMetadataManager, + reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol); + + // Should trigger delta updates. + ozoneManagerServiceProvider.syncDataFromOM(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(ReconTaskStatus.class); + verify(reconTaskStatusDaoMock, times(1)) + .update(captor.capture()); + assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES")); + + verify(reconTaskControllerMock, times(1)) + .consumeOMEvents(any(OMUpdateEventBatch.class), + any(OMMetadataManager.class)); + } + + private ReconTaskController getMockTaskController() { + ReconTaskController reconTaskControllerMock = + mock(ReconTaskController.class); + return reconTaskControllerMock; + } + + private ReconUtils getMockReconUtils() throws IOException { + ReconUtils reconUtilsMock = mock(ReconUtils.class); + when(reconUtilsMock.getReconDbDir(any(), anyString())).thenCallRealMethod(); + doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any()); + return reconUtilsMock; + } + + private OzoneManagerProtocol getMockOzoneManagerClient( + DBUpdatesWrapper dbUpdatesWrapper) throws IOException { + OzoneManagerProtocol ozoneManagerProtocolMock = + mock(OzoneManagerProtocol.class); + when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos + .DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper); + return ozoneManagerProtocolMock; + } + } \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java index 3073907fc0..66be41eae1 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java @@ -29,13 +29,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; * Dummy Recon task that has 3 modes of operations. * ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS */ -public class DummyReconDBTask extends ReconDBUpdateTask { +public class DummyReconDBTask implements ReconDBUpdateTask { private int numFailuresAllowed = Integer.MIN_VALUE; private int callCtr = 0; + private String taskName; - public DummyReconDBTask(String taskName, TaskType taskType) { - super(taskName); + DummyReconDBTask(String taskName, TaskType taskType) { + this.taskName = taskName; if (taskType.equals(TaskType.FAIL_ONCE)) { numFailuresAllowed = 1; } else if (taskType.equals(TaskType.ALWAYS_FAIL)) { @@ -44,12 +45,17 @@ public class DummyReconDBTask extends ReconDBUpdateTask { } @Override - protected Collection getTaskTables() { + public String getTaskName() { + return taskName; + } + + @Override + public Collection getTaskTables() { return Collections.singletonList("volumeTable"); } @Override - Pair process(OMUpdateEventBatch events) { + public Pair process(OMUpdateEventBatch events) { if (++callCtr <= numFailuresAllowed) { return new ImmutablePair<>(getTaskName(), false); } else { @@ -58,7 +64,7 @@ public class DummyReconDBTask extends ReconDBUpdateTask { } @Override - Pair reprocess(OMMetadataManager omMetadataManager) { + public Pair reprocess(OMMetadataManager omMetadataManager) { if (++callCtr <= numFailuresAllowed) { return new ImmutablePair<>(getTaskName(), false); } else { diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 5cc9a48866..383797e26f 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.recon.tasks; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -28,7 +30,6 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -37,31 +38,22 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest; import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl; -import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.utils.db.Table; import org.hadoop.ozone.recon.schema.StatsSchemaDefinition; import org.jooq.impl.DSL; import org.jooq.impl.DefaultConfiguration; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import com.google.inject.Injector; import javax.sql.DataSource; /** * Unit test for Container Key mapper task. */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"}) -@PrepareForTest(ReconUtils.class) public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest { private ContainerDBServiceProvider containerDbServiceProvider; @@ -77,16 +69,9 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest { return injector; } - @Rule - TemporaryFolder temporaryFolder = new TemporaryFolder(); - private void initializeInjector() throws Exception { omMetadataManager = initializeNewOmMetadataManager(); - OzoneConfiguration configuration = - guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder); - - ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl( - configuration); + ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider(); reconOMMetadataManager = getTestMetadataManager(omMetadataManager); injector = guiceInjectorTest.getInjector( @@ -151,10 +136,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest { Collections.singletonList(omKeyLocationInfoGroup)); ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(containerDbServiceProvider, - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - containerKeyMapperTask.reprocess(ozoneManagerServiceProvider - .getOMMetadataManagerInstance()); + new ContainerKeyMapperTask(containerDbServiceProvider); + containerKeyMapperTask.reprocess(reconOMMetadataManager); keyPrefixesForContainer = containerDbServiceProvider.getKeyPrefixesForContainer(1); @@ -258,10 +241,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest { }}); ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(containerDbServiceProvider, - ozoneManagerServiceProvider.getOMMetadataManagerInstance()); - containerKeyMapperTask.reprocess(ozoneManagerServiceProvider - .getOMMetadataManagerInstance()); + new ContainerKeyMapperTask(containerDbServiceProvider); + containerKeyMapperTask.reprocess(reconOMMetadataManager); keyPrefixesForContainer = containerDbServiceProvider .getKeyPrefixesForContainer(1); @@ -317,4 +298,17 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest { omKeyLocationInfoGroup)) .build(); } + + private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider() + throws IOException { + OzoneManagerServiceProviderImpl omServiceProviderMock = + mock(OzoneManagerServiceProviderImpl.class); + OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class); + Table tableMock = mock(Table.class); + when(tableMock.getName()).thenReturn("keyTable"); + when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock); + when(omServiceProviderMock.getOMMetadataManagerInstance()) + .thenReturn(omMetadataManagerMock); + return omServiceProviderMock; + } } \ No newline at end of file diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java index 47a5d6fd78..4d21e4b13f 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java @@ -24,31 +24,21 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.utils.db.TypedTable; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - import java.io.IOException; -import static org.apache.hadoop.ozone.recon.tasks. - OMDBUpdateEvent.OMDBUpdateAction.PUT; +import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.Mockito.when; /** * Unit test for File Size Count Task. */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"}) -@PrepareForTest(OmKeyInfo.class) - public class TestFileSizeCountTask { @Test public void testCalculateBinIndex() { diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index b587c89f5b..6760869727 100644 --- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.recon.tasks; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -28,14 +27,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.File; import java.util.Collections; +import java.util.HashSet; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest; -import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; -import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl; import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition; import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; @@ -50,16 +48,12 @@ import org.junit.Test; public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { private ReconTaskController reconTaskController; - private Configuration sqlConfiguration; + @Before public void setUp() throws Exception { - File omDbDir = temporaryFolder.newFolder(); OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); - ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath()); - ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl( - ozoneConfiguration); sqlConfiguration = getInjector() .getInstance(Configuration.class); @@ -69,7 +63,7 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { schemaDefinition.initializeSchema(); reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration, - omMetadataManager, sqlConfiguration); + sqlConfiguration, new HashSet<>()); } @Test @@ -86,15 +80,17 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { @Test public void testConsumeOMEvents() throws Exception { - ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class); - when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections - .EMPTY_LIST); - when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask"); + ReconDBUpdateTask reconDBUpdateTaskMock = getMockTask("MockTask"); when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class))) .thenReturn(new ImmutablePair<>("MockTask", true)); reconTaskController.registerTask(reconDBUpdateTaskMock); + OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); + when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); + when(omUpdateEventBatchMock.filter(Collections.singleton("MockTable"))) + .thenReturn(omUpdateEventBatchMock); reconTaskController.consumeOMEvents( - new OMUpdateEventBatch(Collections.emptyList())); + omUpdateEventBatchMock, + mock(OMMetadataManager.class)); verify(reconDBUpdateTaskMock, times(1)) .process(any()); @@ -107,17 +103,13 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE); reconTaskController.registerTask(dummyReconDBTask); - - long currentTime = System.nanoTime(); - OMDBUpdateEvent.EventInfo eventInfoMock = mock( - OMDBUpdateEvent.EventInfo.class); - when(eventInfoMock.getSequenceNumber()).thenReturn(100L); - when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime); - + long currentTime = System.currentTimeMillis(); OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); - when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock); + when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); + when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); - reconTaskController.consumeOMEvents(omUpdateEventBatchMock); + reconTaskController.consumeOMEvents(omUpdateEventBatchMock, + mock(OMMetadataManager.class)); assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() .get(dummyReconDBTask.getTaskName())); @@ -126,8 +118,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { ReconTaskStatus dbRecord = dao.findById(taskName); Assert.assertEquals(taskName, dbRecord.getTaskName()); - Assert.assertEquals(Long.valueOf(currentTime), - dbRecord.getLastUpdatedTimestamp()); + Assert.assertTrue( + dbRecord.getLastUpdatedTimestamp() > currentTime); Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber()); } @@ -138,18 +130,14 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL); reconTaskController.registerTask(dummyReconDBTask); - - long currentTime = System.nanoTime(); - OMDBUpdateEvent.EventInfo eventInfoMock = - mock(OMDBUpdateEvent.EventInfo.class); - when(eventInfoMock.getSequenceNumber()).thenReturn(100L); - when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime); - OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); - when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock); + when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); + when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); + OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class); for (int i = 0; i < 2; i++) { - reconTaskController.consumeOMEvents(omUpdateEventBatchMock); + reconTaskController.consumeOMEvents(omUpdateEventBatchMock, + omMetadataManagerMock); assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() @@ -157,8 +145,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { } //Should be blacklisted now. - reconTaskController.consumeOMEvents( - new OMUpdateEventBatch(Collections.emptyList())); + reconTaskController.consumeOMEvents(omUpdateEventBatchMock, + omMetadataManagerMock); assertTrue(reconTaskController.getRegisteredTasks().isEmpty()); ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration); @@ -168,4 +156,36 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest { Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp()); Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber()); } + + + @Test + public void testReInitializeTasks() throws Exception { + + OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class); + ReconDBUpdateTask reconDBUpdateTaskMock = + getMockTask("MockTask2"); + when(reconDBUpdateTaskMock.reprocess(omMetadataManagerMock)) + .thenReturn(new ImmutablePair<>("MockTask2", true)); + + reconTaskController.registerTask(reconDBUpdateTaskMock); + reconTaskController.reInitializeTasks(omMetadataManagerMock); + + verify(reconDBUpdateTaskMock, times(1)) + .reprocess(omMetadataManagerMock); + } + + /** + * Helper method for getting a mocked Task. + * @param taskName name of the task. + * @return instance of ReconDBUpdateTask. + */ + private ReconDBUpdateTask getMockTask(String taskName) { + ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class); + when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections + .EMPTY_LIST); + when(reconDBUpdateTaskMock.getTaskName()).thenReturn(taskName); + when(reconDBUpdateTaskMock.getTaskTables()) + .thenReturn(Collections.singleton("MockTable")); + return reconDBUpdateTaskMock; + } } \ No newline at end of file