HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)

This commit is contained in:
avijayanhwx 2019-08-19 10:47:49 -07:00 committed by Hanisha Koneru
parent c765584eb2
commit d69a1a0aa4
33 changed files with 899 additions and 491 deletions

View File

@ -41,6 +41,10 @@ public class DBUpdatesWrapper {
return dataList;
}
public void setCurrentSequenceNumber(long sequenceNumber) {
this.currentSequenceNumber = sequenceNumber;
}
public long getCurrentSequenceNumber() {
return currentSequenceNumber;
}

View File

@ -37,6 +37,10 @@ public class RDBBatchOperation implements BatchOperation {
writeBatch = new WriteBatch();
}
public RDBBatchOperation(WriteBatch writeBatch) {
this.writeBatch = writeBatch;
}
public void commit(RocksDB db, WriteOptions writeOptions) throws IOException {
try {
db.write(writeOptions, writeBatch);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import java.io.Closeable;
@ -49,6 +50,8 @@ import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
/**
* Protocol to talk to OM.
@ -505,4 +508,14 @@ public interface OzoneManagerProtocol
* */
List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
/**
* Get DB updates since a specific sequence number.
* @param dbUpdatesRequest request that encapsulates a sequence number.
* @return Wrapper containing the updates.
* @throws SequenceNumberNotFoundException if db is unable to read the data.
*/
DBUpdatesWrapper getDBUpdates(
OzoneManagerProtocolProtos.DBUpdatesRequest dbUpdatesRequest)
throws IOException;
}

View File

@ -66,7 +66,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
@ -138,11 +140,14 @@ import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequ
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1396,7 +1401,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
OMRequest omRequest = createOMRequest(Type.SetAcl)
.setSetAclRequest(builder.build())
.build();
OzoneManagerProtocolProtos.SetAclResponse response =
SetAclResponse response =
handleError(submitRequest(omRequest)).getSetAclResponse();
return response.getResponse();
@ -1425,6 +1430,25 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
return acls;
}
@Override
public DBUpdatesWrapper getDBUpdates(DBUpdatesRequest dbUpdatesRequest)
throws IOException {
OMRequest omRequest = createOMRequest(Type.DBUpdates)
.setDbUpdatesRequest(dbUpdatesRequest)
.build();
DBUpdatesResponse dbUpdatesResponse =
handleError(submitRequest(omRequest)).getDbUpdatesResponse();
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
for (ByteString byteString : dbUpdatesResponse.getDataList()) {
dbUpdatesWrapper.addWriteBatch(byteString.toByteArray(), 0L);
}
dbUpdatesWrapper.setCurrentSequenceNumber(
dbUpdatesResponse.getSequenceNumber());
return dbUpdatesWrapper;
}
@Override
public OpenKeySession createFile(OmKeyArgs args,
boolean overWrite, boolean recursive) throws IOException {

View File

@ -3422,6 +3422,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
* @return Wrapper containing the updates.
* @throws SequenceNumberNotFoundException if db is unable to read the data.
*/
@Override
public DBUpdatesWrapper getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {

View File

@ -394,9 +394,10 @@ public class OzoneManagerRequestHandler implements RequestHandler {
DBUpdatesWrapper dbUpdatesWrapper =
impl.getDBUpdates(dbUpdatesRequest);
for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) {
builder.setData(i,
OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i)));
builder.addData(OMPBHelper.getByteString(
dbUpdatesWrapper.getData().get(i)));
}
builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber());
return builder.build();
}

View File

@ -272,30 +272,6 @@
<version>2.8.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.7.4</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>1.7.4</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jooq</groupId>
<artifactId>jooq</artifactId>

View File

@ -29,8 +29,13 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQ
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_AGE;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
@ -40,9 +45,15 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@ -52,6 +63,9 @@ import com.google.inject.Singleton;
* Guice controller that defines concrete bindings.
*/
public class ReconControllerModule extends AbstractModule {
private static final Logger LOG =
LoggerFactory.getLogger(ReconControllerModule.class);
@Override
protected void configure() {
bind(Configuration.class).toProvider(ConfigurationProvider.class);
@ -60,17 +74,37 @@ public class ReconControllerModule extends AbstractModule {
.toProvider(ReconContainerDBProvider.class).in(Singleton.class);
bind(ReconOMMetadataManager.class)
.to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class)
.in(Singleton.class);
bind(ContainerDBServiceProvider.class)
.to(ContainerDBServiceProviderImpl.class).in(Singleton.class);
bind(OzoneManagerServiceProvider.class)
.to(OzoneManagerServiceProviderImpl.class).in(Singleton.class);
bind(ReconUtils.class).in(Singleton.class);
// Persistence - inject configuration provider
install(new JooqPersistenceModule(
getProvider(DataSourceConfiguration.class)));
bind(ReconTaskController.class)
.to(ReconTaskControllerImpl.class).in(Singleton.class);
bind(ContainerKeyMapperTask.class);
bind(FileSizeCountTask.class);
}
@Provides
OzoneManagerProtocol getOzoneManagerProtocol(
final OzoneConfiguration ozoneConfiguration) {
OzoneManagerProtocol ozoneManagerClient = null;
try {
ClientId clientId = ClientId.randomId();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
ozoneManagerClient = new
OzoneManagerProtocolClientSideTranslatorPB(
ozoneConfiguration, clientId.toString(), ugi);
} catch (IOException ioEx) {
LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx);
}
return ozoneManagerClient;
}
@Provides

View File

@ -18,26 +18,15 @@
package org.apache.hadoop.ozone.recon;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,13 +58,15 @@ public class ReconServer extends GenericCli {
ConfigurationProvider.setConfiguration(ozoneConfiguration);
injector = Guice.createInjector(new
ReconControllerModule(), new ReconRestServletModule() {
ReconControllerModule(),
new ReconRestServletModule() {
@Override
protected void configureServlets() {
rest("/api/*")
.packages("org.apache.hadoop.ozone.recon.api");
}
});
},
new ReconTaskBindingModule());
//Pass on injector to listener that does the Guice - Jersey HK2 bridging.
ReconGuiceServletContextListener.setInjector(injector);
@ -95,14 +86,20 @@ public class ReconServer extends GenericCli {
reconInternalSchemaDefinition.initializeSchema();
LOG.info("Recon server initialized successfully!");
} catch (Exception e) {
LOG.error("Error during initializing Recon server.", e);
}
httpServer = injector.getInstance(ReconHttpServer.class);
LOG.info("Starting Recon server");
httpServer.start();
scheduleReconTasks();
//Start Ozone Manager Service that pulls data from OM.
OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
.getInstance(OzoneManagerServiceProvider.class);
ozoneManagerServiceProvider.start();
} catch (Exception e) {
LOG.error("Error during initializing Recon server.", e);
stop();
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
@ -113,52 +110,6 @@ public class ReconServer extends GenericCli {
return null;
}
/**
* Schedule the tasks that is required by Recon to keep its metadata up to
* date.
*/
private void scheduleReconTasks() {
OzoneConfiguration configuration = injector.getInstance(
OzoneConfiguration.class);
ContainerDBServiceProvider containerDBServiceProvider = injector
.getInstance(ContainerDBServiceProvider.class);
OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
.getInstance(OzoneManagerServiceProvider.class);
Configuration sqlConfiguration = injector.getInstance(Configuration.class);
long initialDelay = configuration.getTimeDuration(
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
TimeUnit.MILLISECONDS);
long interval = configuration.getTimeDuration(
RECON_OM_SNAPSHOT_TASK_INTERVAL,
RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(() -> {
try {
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
// Schedule the task to read OM DB and write the reverse mapping to
// Recon container DB.
ContainerKeyMapperTask containerKeyMapperTask =
new ContainerKeyMapperTask(containerDBServiceProvider,
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
containerKeyMapperTask.reprocess(
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
FileSizeCountTask fileSizeCountTask = new
FileSizeCountTask(
ozoneManagerServiceProvider.getOMMetadataManagerInstance(),
sqlConfiguration);
fileSizeCountTask.reprocess(
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
} catch (IOException e) {
LOG.error("Unable to get OM " +
"Snapshot", e);
}
}, initialDelay, interval, TimeUnit.MILLISECONDS);
}
void stop() throws Exception {
LOG.info("Stopping Recon server");
httpServer.stop();

View File

@ -114,7 +114,7 @@ public final class ReconServerConfigKeys {
public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1;
public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
/**
* Private constructor for utility class.

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconDBUpdateTask;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
/**
* Binds the various Recon Tasks.
*/
public class ReconTaskBindingModule extends AbstractModule {
@Override
protected void configure() {
Multibinder<ReconDBUpdateTask> taskBinder =
Multibinder.newSetBinder(binder(), ReconDBUpdateTask.class);
taskBinder.addBinding().to(ContainerKeyMapperTask.class);
taskBinder.addBinding().to(FileSizeCountTask.class);
}
}

View File

@ -51,11 +51,11 @@ import org.slf4j.LoggerFactory;
/**
* Recon Utility class.
*/
public final class ReconUtils {
public class ReconUtils {
private final static int WRITE_BUFFER = 1048576; //1MB
private ReconUtils() {
public ReconUtils() {
}
private static final Logger LOG = LoggerFactory.getLogger(
@ -69,7 +69,7 @@ public final class ReconUtils {
* @param dirConfigKey key to check
* @return Return File based on configured or fallback value.
*/
public static File getReconDbDir(Configuration conf, String dirConfigKey) {
public File getReconDbDir(Configuration conf, String dirConfigKey) {
File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
"Recon");
@ -90,7 +90,7 @@ public final class ReconUtils {
* @param destPath destination path to untar to.
* @throws IOException ioException
*/
public static void untarCheckpointFile(File tarFile, Path destPath)
public void untarCheckpointFile(File tarFile, Path destPath)
throws IOException {
FileInputStream fileInputStream = null;
@ -153,7 +153,7 @@ public final class ReconUtils {
* @return Inputstream to the response of the HTTP call.
* @throws IOException While reading the response.
*/
public static InputStream makeHttpCall(CloseableHttpClient httpClient,
public InputStream makeHttpCall(CloseableHttpClient httpClient,
String url)
throws IOException {

View File

@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.recon.spi;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
@ -28,14 +26,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
public interface OzoneManagerServiceProvider {
/**
* Initialize Ozone Manager Service Provider Impl.
* Start a task to sync data from OM.
*/
void init() throws IOException;
void start();
/**
* Update Recon OM DB with new snapshot from OM.
* Stop the OM sync data.
*/
void updateReconOmDBWithNewSnapshot() throws IOException;
void stop();
/**
* Return instance of OM Metadata manager.

View File

@ -37,6 +37,7 @@ import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
@ -73,6 +74,9 @@ public class ContainerDBServiceProviderImpl
@Inject
private Configuration sqlConfiguration;
@Inject
private ReconUtils reconUtils;
@Inject
public ContainerDBServiceProviderImpl(DBStore dbStore,
Configuration sqlConfiguration) {
@ -101,7 +105,8 @@ public class ContainerDBServiceProviderImpl
throws IOException {
File oldDBLocation = containerDbStore.getDbLocation();
containerDbStore = ReconContainerDBProvider.getNewDBStore(configuration);
containerDbStore = ReconContainerDBProvider
.getNewDBStore(configuration, reconUtils);
containerKeyTable = containerDbStore.getTable(CONTAINER_KEY_TABLE,
ContainerKeyPrefix.class, Integer.class);

View File

@ -27,35 +27,54 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNE
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall;
import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.RDBBatchOperation;
import org.apache.hadoop.utils.db.RDBStore;
import org.apache.hadoop.utils.db.RocksDBCheckpoint;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.ratis.protocol.ClientId;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,11 +94,28 @@ public class OzoneManagerServiceProviderImpl
private File omSnapshotDBParentDir = null;
private String omDBSnapshotUrl;
@Inject
private OzoneManagerProtocol ozoneManagerClient;
private final ClientId clientId = ClientId.randomId();
private final OzoneConfiguration configuration;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private ReconOMMetadataManager omMetadataManager;
private ReconTaskController reconTaskController;
private ReconTaskStatusDao reconTaskStatusDao;
private ReconUtils reconUtils;
private enum OmSnapshotTaskName {
OM_DB_FULL_SNAPSHOT,
OM_DB_DELTA_UPDATES
}
@Inject
public OzoneManagerServiceProviderImpl(Configuration configuration) {
public OzoneManagerServiceProviderImpl(
OzoneConfiguration configuration,
ReconOMMetadataManager omMetadataManager,
ReconTaskController reconTaskController,
ReconUtils reconUtils,
OzoneManagerProtocol ozoneManagerClient) throws IOException {
String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTP_ADDRESS_KEY);
@ -87,7 +123,7 @@ public class OzoneManagerServiceProviderImpl
String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTPS_ADDRESS_KEY);
omSnapshotDBParentDir = getReconDbDir(configuration,
omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(configuration);
@ -127,15 +163,79 @@ public class OzoneManagerServiceProviderImpl
omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
}
this.reconUtils = reconUtils;
this.omMetadataManager = omMetadataManager;
this.reconTaskController = reconTaskController;
this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao();
this.ozoneManagerClient = ozoneManagerClient;
this.configuration = configuration;
}
@Override
public void init() throws IOException {
updateReconOmDBWithNewSnapshot();
public OMMetadataManager getOMMetadataManagerInstance() {
return omMetadataManager;
}
@Override
public void updateReconOmDBWithNewSnapshot() throws IOException {
public void start() {
long initialDelay = configuration.getTimeDuration(
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
TimeUnit.MILLISECONDS);
long interval = configuration.getTimeDuration(
RECON_OM_SNAPSHOT_TASK_INTERVAL,
RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(this::syncDataFromOM,
initialDelay,
interval,
TimeUnit.MILLISECONDS);
}
@Override
public void stop() {
reconTaskController.stop();
scheduler.shutdownNow();
}
/**
* Method to obtain current OM DB Snapshot.
* @return DBCheckpoint instance.
*/
@VisibleForTesting
DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
.currentTimeMillis();
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
".tar.gz");
try {
try (InputStream inputStream = reconUtils.makeHttpCall(httpClient,
omDBSnapshotUrl)) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
}
// Untar the checkpoint file.
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
snapshotFileName);
reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
FileUtils.deleteQuietly(targetFile);
// TODO Create Checkpoint based on OM DB type.
// Currently, OM DB type is not configurable. Hence, defaulting to
// RocksDB.
return new RocksDBCheckpoint(untarredDbDir);
} catch (IOException e) {
LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
}
return null;
}
/**
* Update Local OM DB with new OM DB snapshot.
* @throws IOException
*/
@VisibleForTesting
void updateReconOmDBWithNewSnapshot() throws IOException {
// Obtain the current DB snapshot from OM and
// update the in house OM metadata managed DB instance.
DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
@ -151,41 +251,97 @@ public class OzoneManagerServiceProviderImpl
}
}
@Override
public OMMetadataManager getOMMetadataManagerInstance() {
return omMetadataManager;
/**
* Get Delta updates from OM through RPC call and apply to local OM DB as
* well as accumulate in a buffer.
* @param fromSequenceNumber from sequence number to request from.
* @param omdbUpdatesHandler OM DB updates handler to buffer updates.
* @throws IOException when OM RPC request fails.
* @throws RocksDBException when writing to RocksDB fails.
*/
@VisibleForTesting
void getAndApplyDeltaUpdatesFromOM(
long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber).build();
DBUpdatesWrapper dbUpdates = ozoneManagerClient.getDBUpdates(
dbUpdatesRequest);
if (null != dbUpdates) {
RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
RocksDB rocksDB = rocksDBStore.getDb();
LOG.debug("Number of updates received from OM : " +
dbUpdates.getData().size());
for (byte[] data : dbUpdates.getData()) {
WriteBatch writeBatch = new WriteBatch(data);
writeBatch.iterate(omdbUpdatesHandler);
RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch);
rdbBatchOperation.commit(rocksDB, new WriteOptions());
}
}
}
/**
* Method to obtain current OM DB Snapshot.
* @return DBCheckpoint instance.
* Based on current state of Recon's OM DB, we either get delta updates or
* full snapshot from Ozone Manager.
*/
@VisibleForTesting
protected DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
.currentTimeMillis();
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
".tar.gz");
void syncDataFromOM() {
long currentSequenceNumber = getCurrentOMDBSequenceNumber();
boolean fullSnapshot = false;
if (currentSequenceNumber <= 0) {
fullSnapshot = true;
} else {
OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
try {
try (InputStream inputStream = makeHttpCall(httpClient,
omDBSnapshotUrl)) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
}
//Untar the checkpoint file.
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
snapshotFileName);
untarCheckpointFile(targetFile, untarredDbDir);
FileUtils.deleteQuietly(targetFile);
//TODO Create Checkpoint based on OM DB type.
// Currently, OM DB type is not configurable. Hence, defaulting to
// RocksDB.
return new RocksDBCheckpoint(untarredDbDir);
} catch (IOException e) {
LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
}
return null;
// Get updates from OM and apply to local Recon OM DB.
getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
omdbUpdatesHandler);
// Update timestamp of successful delta updates query.
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
OmSnapshotTaskName.OM_DB_DELTA_UPDATES.name(),
System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
reconTaskStatusDao.update(reconTaskStatusRecord);
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents()), omMetadataManager);
} catch (IOException | InterruptedException | RocksDBException e) {
LOG.warn("Unable to get and apply delta updates from OM.", e);
fullSnapshot = true;
}
}
if (fullSnapshot) {
try {
// Update local Recon OM DB to new snapshot.
updateReconOmDBWithNewSnapshot();
// Update timestamp of successful delta updates query.
ReconTaskStatus reconTaskStatusRecord =
new ReconTaskStatus(
OmSnapshotTaskName.OM_DB_FULL_SNAPSHOT.name(),
System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
reconTaskStatusDao.update(reconTaskStatusRecord);
// Reinitialize tasks that are listening.
reconTaskController.reInitializeTasks(omMetadataManager);
} catch (IOException | InterruptedException e) {
LOG.error("Unable to update Recon's OM DB with new snapshot ", e);
}
}
}
/**
* Get OM RocksDB's latest sequence number.
* @return latest sequence number.
*/
private long getCurrentOMDBSequenceNumber() {
RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
if (null == rocksDBStore) {
return 0;
} else {
return rocksDBStore.getDb().getLatestSequenceNumber();
}
}
}

View File

@ -22,11 +22,11 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_COUNT_T
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
import java.nio.file.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.DBStoreBuilder;
@ -52,9 +52,12 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
@Inject
private OzoneConfiguration configuration;
@Inject
private ReconUtils reconUtils;
@Override
public DBStore get() {
DBStore dbStore = getNewDBStore(configuration);
DBStore dbStore = getNewDBStore(configuration, reconUtils);
if (dbStore == null) {
throw new ProvisionException("Unable to provide instance of DBStore " +
"store.");
@ -62,11 +65,13 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
return dbStore;
}
public static DBStore getNewDBStore(OzoneConfiguration configuration) {
public static DBStore getNewDBStore(OzoneConfiguration configuration,
ReconUtils reconUtils) {
DBStore dbStore = null;
String dbName = RECON_CONTAINER_DB + "_" + System.currentTimeMillis();
try {
Path metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR).toPath();
Path metaDir = reconUtils.getReconDbDir(
configuration, OZONE_RECON_DB_DIR).toPath();
dbStore = DBStoreBuilder.newBuilder(configuration)
.setPath(metaDir)
.setName(dbName)

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -41,28 +43,23 @@ import org.apache.hadoop.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
/**
* Class to iterate over the OM DB and populate the Recon container DB with
* the container -> Key reverse mapping.
*/
public class ContainerKeyMapperTask extends ReconDBUpdateTask {
public class ContainerKeyMapperTask implements ReconDBUpdateTask {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerKeyMapperTask.class);
private ContainerDBServiceProvider containerDBServiceProvider;
private Collection<String> tables = new ArrayList<>();
@Inject
public ContainerKeyMapperTask(ContainerDBServiceProvider
containerDBServiceProvider,
OMMetadataManager omMetadataManager) {
super("ContainerKeyMapperTask");
containerDBServiceProvider) {
this.containerDBServiceProvider = containerDBServiceProvider;
try {
tables.add(omMetadataManager.getKeyTable().getName());
} catch (IOException ioEx) {
LOG.error("Unable to listen on Key Table updates ", ioEx);
}
}
/**
@ -103,13 +100,19 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
}
@Override
protected Collection<String> getTaskTables() {
return tables;
public String getTaskName() {
return "ContainerKeyMapperTask";
}
@Override
Pair<String, Boolean> process(OMUpdateEventBatch events) {
public Collection<String> getTaskTables() {
return Collections.singletonList(KEY_TABLE);
}
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
int eventCount = 0;
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
String updatedKey = omdbUpdateEvent.getKey();
@ -127,12 +130,15 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent
.getAction());
}
eventCount++;
} catch (IOException e) {
LOG.error("Unexpected exception while updating key data : {} ",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
}
}
LOG.info("{} successfully processed {} OM DB update event(s).",
getTaskName(), eventCount);
return new ImmutablePair<>(getTaskName(), true);
}

View File

@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.apache.hadoop.ozone.recon.tasks.
OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.
@ -48,7 +49,7 @@ import static org.apache.hadoop.ozone.recon.tasks.
* files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
* fileSize DB.
*/
public class FileSizeCountTask extends ReconDBUpdateTask {
public class FileSizeCountTask implements ReconDBUpdateTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
@ -56,19 +57,11 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
private long[] upperBoundCount;
private long oneKb = 1024L;
private Collection<String> tables = new ArrayList<>();
private FileCountBySizeDao fileCountBySizeDao;
@Inject
public FileSizeCountTask(OMMetadataManager omMetadataManager,
Configuration sqlConfiguration) {
super("FileSizeCountTask");
try {
tables.add(omMetadataManager.getKeyTable().getName());
public FileSizeCountTask(Configuration sqlConfiguration) {
fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
} catch (Exception e) {
LOG.error("Unable to fetch Key Table updates ", e);
}
upperBoundCount = new long[getMaxBinSize()];
}
@ -98,7 +91,6 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
LOG.info("Starting a 'reprocess' run of FileSizeCountTask.");
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
@ -119,8 +111,13 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
}
@Override
protected Collection<String> getTaskTables() {
return tables;
public String getTaskName() {
return "FileSizeCountTask";
}
@Override
public Collection<String> getTaskTables() {
return Collections.singletonList(KEY_TABLE);
}
private void updateCountFromDB() {
@ -144,8 +141,7 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
* @return Pair
*/
@Override
Pair<String, Boolean> process(OMUpdateEventBatch events) {
LOG.info("Starting a 'process' run of FileSizeCountTask.");
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
//update array with file size count from DB
@ -246,9 +242,9 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
//decrement only if it had files before, default DB value is 0
upperBoundCount[binIndex]--;
} else {
LOG.debug("Cannot decrement count. Default value is 0 (zero).");
throw new IOException("Cannot decrement count. "
+ "Default value is 0 (zero).");
LOG.warn("Unexpected error while updating bin count. Found 0 count " +
"for index : " + binIndex + " while processing DELETE event for "
+ omKeyInfo.getKeyName());
}
}
}

View File

@ -30,18 +30,18 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
private final String table;
private final KEY updatedKey;
private final VALUE updatedValue;
private final EventInfo eventInfo;
private final long sequenceNumber;
private OMDBUpdateEvent(OMDBUpdateAction action,
String table,
KEY updatedKey,
VALUE updatedValue,
EventInfo eventInfo) {
long sequenceNumber) {
this.action = action;
this.table = table;
this.updatedKey = updatedKey;
this.updatedValue = updatedValue;
this.eventInfo = eventInfo;
this.sequenceNumber = sequenceNumber;
}
public OMDBUpdateAction getAction() {
@ -60,8 +60,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
return updatedValue;
}
public EventInfo getEventInfo() {
return eventInfo;
public long getSequenceNumber() {
return sequenceNumber;
}
/**
@ -75,7 +75,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
private String table;
private KEY updatedKey;
private VALUE updatedValue;
private EventInfo eventInfo;
private long lastSequenceNumber;
OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) {
this.action = omdbUpdateAction;
@ -97,10 +97,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
return this;
}
OMUpdateEventBuilder setEventInfo(long sequenceNumber,
long eventTimestampMillis) {
this.eventInfo = new EventInfo(sequenceNumber,
eventTimestampMillis);
OMUpdateEventBuilder setSequenceNumber(long sequenceNumber) {
this.lastSequenceNumber = sequenceNumber;
return this;
}
@ -114,30 +112,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
table,
updatedKey,
updatedValue,
eventInfo);
}
}
/**
* Class used to hold timing information for an event. (Seq number and
* timestamp)
*/
public static class EventInfo {
private long sequenceNumber;
private long eventTimestampMillis;
public EventInfo(long sequenceNumber,
long eventTimestampMillis) {
this.sequenceNumber = sequenceNumber;
this.eventTimestampMillis = eventTimestampMillis;
}
public long getSequenceNumber() {
return sequenceNumber;
}
public long getEventTimestampMillis() {
return eventTimestampMillis;
lastSequenceNumber);
}
}

View File

@ -78,6 +78,11 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
/**
*
* @param cfIndex
* @param keyBytes
* @param valueBytes
* @param action
* @throws IOException
*/
private void processEvent(int cfIndex, byte[] keyBytes, byte[]
valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
@ -100,8 +105,8 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
builder.setAction(action);
OMDBUpdateEvent event = builder.build();
LOG.info("Generated OM update Event for table : " + event.getTable()
+ ", Key = " + event.getKey());
LOG.debug("Generated OM update Event for table : " + event.getTable()
+ ", Key = " + event.getKey() + ", action = " + event.getAction());
// Temporarily adding to an event buffer for testing. In subsequent JIRAs,
// a Recon side class will be implemented that requests delta updates
// from OM and calls on this handler. In that case, we will fill up

View File

@ -31,7 +31,7 @@ public class OMUpdateEventBatch {
private List<OMDBUpdateEvent> events;
OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
public OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
events = new ArrayList<>(e);
}
@ -39,11 +39,11 @@ public class OMUpdateEventBatch {
* Get Sequence Number and timestamp of last event in this batch.
* @return Event Info instance.
*/
OMDBUpdateEvent.EventInfo getLastEventInfo() {
long getLastSequenceNumber() {
if (events.isEmpty()) {
return new OMDBUpdateEvent.EventInfo(-1, -1);
return -1;
} else {
return events.get(events.size() - 1).getEventInfo();
return events.get(events.size() - 1).getSequenceNumber();
}
}
@ -66,4 +66,12 @@ public class OMUpdateEventBatch {
.filter(e -> tables.contains(e.getTable()))
.collect(Collectors.toList()));
}
/**
* Return if empty.
* @return true if empty, else false.
*/
public boolean isEmpty() {
return !getIterator().hasNext();
}
}

View File

@ -24,43 +24,35 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
* Abstract class used to denote a Recon task that needs to act on OM DB events.
* Interface used to denote a Recon task that needs to act on OM DB events.
*/
public abstract class ReconDBUpdateTask {
private String taskName;
protected ReconDBUpdateTask(String taskName) {
this.taskName = taskName;
}
public interface ReconDBUpdateTask {
/**
* Return task name.
* @return task name
*/
public String getTaskName() {
return taskName;
}
String getTaskName();
/**
* Return the list of tables that the task is listening on.
* Empty list means the task is NOT listening on any tables.
* @return Collection of Tables.
*/
protected abstract Collection<String> getTaskTables();
Collection<String> getTaskTables();
/**
* Process a set of OM events on tables that the task is listening on.
* @param events Set of events to be processed by the task.
* @return Pair of task name -> task success.
*/
abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
Pair<String, Boolean> process(OMUpdateEventBatch events);
/**
* Process a on tables that the task is listening on.
* @param omMetadataManager OM Metadata manager instance.
* @return Pair of task name -> task success.
*/
abstract Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
}

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.recon.tasks;
import java.util.Map;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
/**
* Controller used by Recon to manage Tasks that are waiting on Recon events.
*/
@ -36,11 +39,31 @@ public interface ReconTaskController {
* @param events set of events
* @throws InterruptedException InterruptedException
*/
void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException;
void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager)
throws InterruptedException;
/**
* Pass on the handle to a new OM DB instance to the registered tasks.
* @param omMetadataManager OM Metadata Manager instance
*/
void reInitializeTasks(OMMetadataManager omMetadataManager)
throws InterruptedException;
/**
* Get set of registered tasks.
* @return Map of Task name -> Task.
*/
Map<String, ReconDBUpdateTask> getRegisteredTasks();
/**
* Get instance of ReconTaskStatusDao.
* @return instance of ReconTaskStatusDao
*/
ReconTaskStatusDao getReconTaskStatusDao();
/**
* Stop the tasks. Start API is not needed since it is implicit.
*/
void stop();
}

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.jooq.Configuration;
@ -57,21 +58,22 @@ public class ReconTaskControllerImpl implements ReconTaskController {
private ExecutorService executorService;
private int threadCount = 1;
private final Semaphore taskSemaphore = new Semaphore(1);
private final ReconOMMetadataManager omMetadataManager;
private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
private static final int TASK_FAILURE_THRESHOLD = 2;
private ReconTaskStatusDao reconTaskStatusDao;
@Inject
public ReconTaskControllerImpl(OzoneConfiguration configuration,
ReconOMMetadataManager omMetadataManager,
Configuration sqlConfiguration) {
this.omMetadataManager = omMetadataManager;
Configuration sqlConfiguration,
Set<ReconDBUpdateTask> tasks) {
reconDBUpdateTasks = new HashMap<>();
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
executorService = Executors.newFixedThreadPool(threadCount);
reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration);
for (ReconDBUpdateTask task : tasks) {
registerTask(task);
}
}
@Override
@ -86,8 +88,10 @@ public class ReconTaskControllerImpl implements ReconTaskController {
// Create DB record for the task.
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
0L, 0L);
if (!reconTaskStatusDao.existsById(taskName)) {
reconTaskStatusDao.insert(reconTaskStatusRecord);
}
}
/**
* For every registered task, we try process step twice and then reprocess
@ -98,16 +102,19 @@ public class ReconTaskControllerImpl implements ReconTaskController {
* @throws InterruptedException
*/
@Override
public void consumeOMEvents(OMUpdateEventBatch events)
public void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager)
throws InterruptedException {
taskSemaphore.acquire();
try {
if (!events.isEmpty()) {
Collection<Callable<Pair>> tasks = new ArrayList<>();
for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
reconDBUpdateTasks.entrySet()) {
ReconDBUpdateTask task = taskEntry.getValue();
tasks.add(() -> task.process(events));
Collection<String> tables = task.getTaskTables();
tasks.add(() -> task.process(events.filter(tables)));
}
List<Future<Pair>> results = executorService.invokeAll(tasks);
@ -119,13 +126,14 @@ public class ReconTaskControllerImpl implements ReconTaskController {
tasks.clear();
for (String taskName : failedTasks) {
ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
tasks.add(() -> task.process(events));
Collection<String> tables = task.getTaskTables();
tasks.add(() -> task.process(events.filter(tables)));
}
results = executorService.invokeAll(tasks);
retryFailedTasks = processTaskResults(results, events);
}
//Reprocess
// Reprocess the failed tasks.
// TODO Move to a separate task queue since reprocess may be a heavy
// operation for large OM DB instances
if (!retryFailedTasks.isEmpty()) {
@ -135,7 +143,8 @@ public class ReconTaskControllerImpl implements ReconTaskController {
tasks.add(() -> task.reprocess(omMetadataManager));
}
results = executorService.invokeAll(tasks);
List<String> reprocessFailedTasks = processTaskResults(results, events);
List<String> reprocessFailedTasks =
processTaskResults(results, events);
for (String taskName : reprocessFailedTasks) {
LOG.info("Reprocess step failed for task : " + taskName);
if (taskFailureCounter.get(taskName).incrementAndGet() >
@ -146,6 +155,34 @@ public class ReconTaskControllerImpl implements ReconTaskController {
}
}
}
}
} catch (ExecutionException e) {
LOG.error("Unexpected error : ", e);
} finally {
taskSemaphore.release();
}
}
@Override
public void reInitializeTasks(OMMetadataManager omMetadataManager)
throws InterruptedException {
taskSemaphore.acquire();
try {
Collection<Callable<Pair>> tasks = new ArrayList<>();
for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
reconDBUpdateTasks.entrySet()) {
ReconDBUpdateTask task = taskEntry.getValue();
tasks.add(() -> task.reprocess(omMetadataManager));
}
List<Future<Pair>> results = executorService.invokeAll(tasks);
for (Future<Pair> f : results) {
String taskName = f.get().getLeft().toString();
if (!(Boolean)f.get().getRight()) {
LOG.info("Init failed for task : " + taskName);
}
}
} catch (ExecutionException e) {
LOG.error("Unexpected error : ", e);
} finally {
@ -157,12 +194,12 @@ public class ReconTaskControllerImpl implements ReconTaskController {
* Store the last completed event sequence number and timestamp to the DB
* for that task.
* @param taskName taskname to be updated.
* @param eventInfo contains the new sequence number and timestamp.
* @param lastSequenceNumber contains the new sequence number.
*/
private void storeLastCompletedTransaction(
String taskName, OMDBUpdateEvent.EventInfo eventInfo) {
String taskName, long lastSequenceNumber) {
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber());
System.currentTimeMillis(), lastSequenceNumber);
reconTaskStatusDao.update(reconTaskStatusRecord);
}
@ -171,6 +208,16 @@ public class ReconTaskControllerImpl implements ReconTaskController {
return reconDBUpdateTasks;
}
@Override
public ReconTaskStatusDao getReconTaskStatusDao() {
return reconTaskStatusDao;
}
@Override
public void stop() {
this.executorService.shutdownNow();
}
/**
* Wait on results of all tasks.
* @param results Set of Futures.
@ -190,7 +237,7 @@ public class ReconTaskControllerImpl implements ReconTaskController {
failedTasks.add(f.get().getLeft().toString());
} else {
taskFailureCounter.get(taskName).set(0);
storeLastCompletedTransaction(taskName, events.getLastEventInfo());
storeLastCompletedTransaction(taskName, events.getLastSequenceNumber());
}
}
return failedTasks;

View File

@ -56,7 +56,7 @@ public abstract class AbstractOMMetadataManagerTest {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* Create a new OM Metadata manager instance.
* Create a new OM Metadata manager instance with default volume and bucket.
* @throws IOException ioEx
*/
protected OMMetadataManager initializeNewOmMetadataManager()
@ -87,6 +87,19 @@ public abstract class AbstractOMMetadataManagerTest {
return omMetadataManager;
}
/**
* Create an empty OM Metadata manager instance.
* @throws IOException ioEx
*/
protected OMMetadataManager initializeEmptyOmMetadataManager()
throws IOException {
File omDbDir = temporaryFolder.newFolder();
OzoneConfiguration omConfiguration = new OzoneConfiguration();
omConfiguration.set(OZONE_OM_DB_DIRS,
omDbDir.getAbsolutePath());
return new OmMetadataManagerImpl(omConfiguration);
}
/**
* Get an instance of Recon OM Metadata manager.
* @return ReconOMMetadataManager

View File

@ -61,7 +61,7 @@ public class TestReconUtils {
OzoneConfiguration configuration = new OzoneConfiguration();
configuration.set("TEST_DB_DIR", filePath);
File file = ReconUtils.getReconDbDir(configuration,
File file = new ReconUtils().getReconDbDir(configuration,
"TEST_DB_DIR");
Assert.assertEquals(filePath, file.getAbsolutePath());
}
@ -89,7 +89,7 @@ public class TestReconUtils {
//Create test tar file.
File tarFile = OmUtils.createTarFile(newDir.toPath());
File outputDir = folder.newFolder();
ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath());
new ReconUtils().untarCheckpointFile(tarFile, outputDir.toPath());
assertTrue(outputDir.isDirectory());
assertTrue(outputDir.listFiles().length == 2);
@ -126,7 +126,8 @@ public class TestReconUtils {
}
});
InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
InputStream inputStream = new ReconUtils()
.makeHttpCall(httpClientMock, url);
String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
assertEquals("File 1 Contents", contents);

View File

@ -20,10 +20,9 @@ package org.apache.hadoop.ozone.recon.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -36,15 +35,12 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
@ -53,53 +49,33 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.hadoop.utils.db.Table;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultConfiguration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import org.junit.rules.TemporaryFolder;
/**
* Test for container key service.
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
@PrepareForTest(ReconUtils.class)
public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private ContainerDBServiceProvider containerDbServiceProvider;
private OMMetadataManager omMetadataManager;
private Injector injector;
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
private ContainerKeyService containerKeyService;
private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
new GuiceInjectorUtilsForTestsImpl();
private boolean isSetupDone = false;
private ReconOMMetadataManager reconOMMetadataManager;
private void initializeInjector() throws Exception {
omMetadataManager = initializeNewOmMetadataManager();
OzoneConfiguration configuration =
guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
configuration);
ReconOMMetadataManager reconOMMetadataManager =
getTestMetadataManager(omMetadataManager);
reconOMMetadataManager = getTestMetadataManager(
initializeNewOmMetadataManager());
ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
Injector parentInjector = guiceInjectorTest.getInjector(
ozoneManagerServiceProvider, reconOMMetadataManager, temporaryFolder);
@ -150,7 +126,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
//key = key_one, Blocks = [ {CID = 1, LID = 101}, {CID = 2, LID = 102} ]
writeDataToOm(omMetadataManager,
writeDataToOm(reconOMMetadataManager,
"key_one", "bucketOne", "sampleVol",
Collections.singletonList(omKeyLocationInfoGroup));
@ -174,7 +150,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
omKeyLocationInfoListNew));
//key = key_two, Blocks = [ {CID = 1, LID = 103}, {CID = 1, LID = 104} ]
writeDataToOm(omMetadataManager,
writeDataToOm(reconOMMetadataManager,
"key_two", "bucketOne", "sampleVol", infoGroups);
List<OmKeyLocationInfo> omKeyLocationInfoList2 = new ArrayList<>();
@ -192,27 +168,18 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
OmKeyLocationInfoGroup(0, omKeyLocationInfoList2);
//key = key_three, Blocks = [ {CID = 2, LID = 2}, {CID = 2, LID = 3} ]
writeDataToOm(omMetadataManager,
writeDataToOm(reconOMMetadataManager,
"key_three", "bucketOne", "sampleVol",
Collections.singletonList(omKeyLocationInfoGroup2));
//Take snapshot of OM DB and copy over to Recon OM DB.
DBCheckpoint checkpoint = omMetadataManager.getStore()
.getCheckpoint(true);
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
InputStream inputStream = new FileInputStream(tarFile);
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
"makeHttpCall",
CloseableHttpClient.class, String.class))
.toReturn(inputStream);
//Generate Recon container DB data.
ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
containerDbServiceProvider,
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
.getOMMetadataManagerInstance());
OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
Table tableMock = mock(Table.class);
when(tableMock.getName()).thenReturn("KeyTable");
when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
ContainerKeyMapperTask containerKeyMapperTask =
new ContainerKeyMapperTask(containerDbServiceProvider);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
}
@Test
@ -397,4 +364,10 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
assertEquals(2, containers.size());
assertEquals(2, data.getTotalCount());
}
private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider() {
OzoneManagerServiceProviderImpl omServiceProviderMock =
mock(OzoneManagerServiceProviderImpl.class);
return omServiceProviderMock;
}
}

View File

@ -18,35 +18,25 @@
package org.apache.hadoop.ozone.recon.api;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for File size count service.
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
@PrepareForTest(ReconUtils.class)
public class TestUtilizationService {
private UtilizationService utilizationService;
@Mock private FileCountBySizeDao fileCountBySizeDao;
private int maxBinSize = 42;
private List<FileCountBySize> setUpResultList() {
@ -68,6 +58,7 @@ public class TestUtilizationService {
public void testGetFileCounts() {
List<FileCountBySize> resultList = setUpResultList();
FileCountBySizeDao fileCountBySizeDao = mock(FileCountBySizeDao.class);
utilizationService = mock(UtilizationService.class);
when(utilizationService.getFileCounts()).thenCallRealMethod();
when(utilizationService.getDao()).thenReturn(fileCountBySizeDao);

View File

@ -18,104 +18,108 @@
package org.apache.hadoop.ozone.recon.spi.impl;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.RDBStore;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.inject.Injector;
import org.mockito.ArgumentCaptor;
import org.rocksdb.RocksDB;
import org.rocksdb.TransactionLogIterator;
import org.rocksdb.WriteBatch;
/**
* Class to test Ozone Manager Service Provider Implementation.
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
@PrepareForTest(ReconUtils.class)
public class TestOzoneManagerServiceProviderImpl extends
AbstractOMMetadataManagerTest {
private OMMetadataManager omMetadataManager;
private ReconOMMetadataManager reconOMMetadataManager;
private Injector injector;
private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
new GuiceInjectorUtilsForTestsImpl();
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
private boolean isSetupDone = false;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private OzoneConfiguration configuration;
private OzoneManagerProtocol ozoneManagerProtocol;
@Before
public void setUp() throws Exception {
omMetadataManager = initializeNewOmMetadataManager();
writeDataToOm(omMetadataManager, "key_one");
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(
guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder));
if (!isSetupDone) {
injector = guiceInjectorTest.getInjector(ozoneManagerServiceProvider,
reconOMMetadataManager, temporaryFolder);
isSetupDone = true;
}
configuration = new OzoneConfiguration();
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
temporaryFolder.newFolder().getAbsolutePath());
configuration.set(OZONE_RECON_DB_DIR,
temporaryFolder.newFolder().getAbsolutePath());
configuration.set("ozone.om.address", "localhost:9862");
ozoneManagerProtocol = getMockOzoneManagerClient(new DBUpdatesWrapper());
}
@Test
public void testInit() throws Exception {
public void testUpdateReconOmDBWithNewSnapshot() throws Exception {
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
ReconOMMetadataManager reconOMMetadataManager =
getTestMetadataManager(omMetadataManager);
writeDataToOm(omMetadataManager, "key_one");
writeDataToOm(omMetadataManager, "key_two");
DBCheckpoint checkpoint = omMetadataManager.getStore()
.getCheckpoint(true);
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
InputStream inputStream = new FileInputStream(tarFile);
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
"makeHttpCall",
CloseableHttpClient.class, String.class))
.toReturn(inputStream);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))
.thenReturn(inputStream);
ozoneManagerServiceProvider.init();
ReconTaskController reconTaskController = getMockTaskController();
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
reconOMMetadataManager, reconTaskController, reconUtilsMock,
ozoneManagerProtocol);
Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
}
@Test
public void testGetOMMetadataManagerInstance() throws Exception {
OMMetadataManager omMetaMgr = ozoneManagerServiceProvider
.getOMMetadataManagerInstance();
assertNotNull(omMetaMgr);
ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
assertNotNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
assertNotNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
}
@Test
@ -144,12 +148,18 @@ public class TestOzoneManagerServiceProviderImpl extends
//Create test tar file.
File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
InputStream fileInputStream = new FileInputStream(tarFile);
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
"makeHttpCall",
CloseableHttpClient.class, String.class))
.toReturn(fileInputStream);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))
.thenReturn(fileInputStream);
ReconOMMetadataManager reconOMMetadataManager =
mock(ReconOMMetadataManager.class);
ReconTaskController reconTaskController = getMockTaskController();
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
reconOMMetadataManager, reconTaskController, reconUtilsMock,
ozoneManagerProtocol);
DBCheckpoint checkpoint = ozoneManagerServiceProvider
.getOzoneManagerDBSnapshot();
@ -158,4 +168,150 @@ public class TestOzoneManagerServiceProviderImpl extends
assertTrue(checkpoint.getCheckpointLocation().toFile()
.listFiles().length == 2);
}
@Test
public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
// Writing 2 Keys into a source OM DB and collecting it in a
// DBUpdatesWrapper.
OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
writeDataToOm(sourceOMMetadataMgr, "key_one");
writeDataToOm(sourceOMMetadataMgr, "key_two");
RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
while(transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
result.writeBatch().markWalTerminationPoint();
WriteBatch writeBatch = result.writeBatch();
dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
result.sequenceNumber());
transactionLogIterator.next();
}
// OM Service Provider's Metadata Manager.
OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
getTestMetadataManager(omMetadataManager),
getMockTaskController(), new ReconUtils(),
getMockOzoneManagerClient(dbUpdatesWrapper));
OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);
// In this method, we have to assert the "GET" part and the "APPLY" path.
// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
assertEquals(4, updatesHandler.getEvents().size());
// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the changes.
String fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_one");
assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable().isExist(fullKey));
fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_two");
assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable().isExist(fullKey));
}
@Test
public void testSyncDataFromOMFullSnapshot() throws Exception {
// Empty OM DB to start with.
ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
initializeEmptyOmMetadataManager());
ReconTaskStatusDao reconTaskStatusDaoMock =
mock(ReconTaskStatusDao.class);
doNothing().when(reconTaskStatusDaoMock)
.update(any(ReconTaskStatus.class));
ReconTaskController reconTaskControllerMock = getMockTaskController();
when(reconTaskControllerMock.getReconTaskStatusDao())
.thenReturn(reconTaskStatusDaoMock);
doNothing().when(reconTaskControllerMock)
.reInitializeTasks(omMetadataManager);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
//Should trigger full snapshot request.
ozoneManagerServiceProvider.syncDataFromOM();
ArgumentCaptor<ReconTaskStatus> captor =
ArgumentCaptor.forClass(ReconTaskStatus.class);
verify(reconTaskStatusDaoMock, times(1))
.update(captor.capture());
assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager);
}
@Test
public void testSyncDataFromOMDeltaUpdates() throws Exception {
// Non-Empty OM DB to start with.
ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
initializeNewOmMetadataManager());
ReconTaskStatusDao reconTaskStatusDaoMock =
mock(ReconTaskStatusDao.class);
doNothing().when(reconTaskStatusDaoMock)
.update(any(ReconTaskStatus.class));
ReconTaskController reconTaskControllerMock = getMockTaskController();
when(reconTaskControllerMock.getReconTaskStatusDao())
.thenReturn(reconTaskStatusDaoMock);
doNothing().when(reconTaskControllerMock)
.consumeOMEvents(any(OMUpdateEventBatch.class),
any(OMMetadataManager.class));
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
// Should trigger delta updates.
ozoneManagerServiceProvider.syncDataFromOM();
ArgumentCaptor<ReconTaskStatus> captor =
ArgumentCaptor.forClass(ReconTaskStatus.class);
verify(reconTaskStatusDaoMock, times(1))
.update(captor.capture());
assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
verify(reconTaskControllerMock, times(1))
.consumeOMEvents(any(OMUpdateEventBatch.class),
any(OMMetadataManager.class));
}
private ReconTaskController getMockTaskController() {
ReconTaskController reconTaskControllerMock =
mock(ReconTaskController.class);
return reconTaskControllerMock;
}
private ReconUtils getMockReconUtils() throws IOException {
ReconUtils reconUtilsMock = mock(ReconUtils.class);
when(reconUtilsMock.getReconDbDir(any(), anyString())).thenCallRealMethod();
doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
return reconUtilsMock;
}
private OzoneManagerProtocol getMockOzoneManagerClient(
DBUpdatesWrapper dbUpdatesWrapper) throws IOException {
OzoneManagerProtocol ozoneManagerProtocolMock =
mock(OzoneManagerProtocol.class);
when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
.DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
return ozoneManagerProtocolMock;
}
}

View File

@ -29,13 +29,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
* Dummy Recon task that has 3 modes of operations.
* ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS
*/
public class DummyReconDBTask extends ReconDBUpdateTask {
public class DummyReconDBTask implements ReconDBUpdateTask {
private int numFailuresAllowed = Integer.MIN_VALUE;
private int callCtr = 0;
private String taskName;
public DummyReconDBTask(String taskName, TaskType taskType) {
super(taskName);
DummyReconDBTask(String taskName, TaskType taskType) {
this.taskName = taskName;
if (taskType.equals(TaskType.FAIL_ONCE)) {
numFailuresAllowed = 1;
} else if (taskType.equals(TaskType.ALWAYS_FAIL)) {
@ -44,12 +45,17 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
}
@Override
protected Collection<String> getTaskTables() {
public String getTaskName() {
return taskName;
}
@Override
public Collection<String> getTaskTables() {
return Collections.singletonList("volumeTable");
}
@Override
Pair<String, Boolean> process(OMUpdateEventBatch events) {
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
if (++callCtr <= numFailuresAllowed) {
return new ImmutablePair<>(getTaskName(), false);
} else {
@ -58,7 +64,7 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
}
@Override
Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
if (++callCtr <= numFailuresAllowed) {
return new ImmutablePair<>(getTaskName(), false);
} else {

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.recon.tasks;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -28,7 +30,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@ -37,31 +38,22 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.utils.db.Table;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultConfiguration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.inject.Injector;
import javax.sql.DataSource;
/**
* Unit test for Container Key mapper task.
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
@PrepareForTest(ReconUtils.class)
public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
private ContainerDBServiceProvider containerDbServiceProvider;
@ -77,16 +69,9 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
return injector;
}
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
private void initializeInjector() throws Exception {
omMetadataManager = initializeNewOmMetadataManager();
OzoneConfiguration configuration =
guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
configuration);
ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
injector = guiceInjectorTest.getInjector(
@ -151,10 +136,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
Collections.singletonList(omKeyLocationInfoGroup));
ContainerKeyMapperTask containerKeyMapperTask =
new ContainerKeyMapperTask(containerDbServiceProvider,
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
.getOMMetadataManagerInstance());
new ContainerKeyMapperTask(containerDbServiceProvider);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer =
containerDbServiceProvider.getKeyPrefixesForContainer(1);
@ -258,10 +241,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
}});
ContainerKeyMapperTask containerKeyMapperTask =
new ContainerKeyMapperTask(containerDbServiceProvider,
ozoneManagerServiceProvider.getOMMetadataManagerInstance());
containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
.getOMMetadataManagerInstance());
new ContainerKeyMapperTask(containerDbServiceProvider);
containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer = containerDbServiceProvider
.getKeyPrefixesForContainer(1);
@ -317,4 +298,17 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
omKeyLocationInfoGroup))
.build();
}
private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider()
throws IOException {
OzoneManagerServiceProviderImpl omServiceProviderMock =
mock(OzoneManagerServiceProviderImpl.class);
OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
Table tableMock = mock(Table.class);
when(tableMock.getName()).thenReturn("keyTable");
when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
when(omServiceProviderMock.getOMMetadataManagerInstance())
.thenReturn(omMetadataManagerMock);
return omServiceProviderMock;
}
}

View File

@ -24,31 +24,21 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.utils.db.TypedTable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import static org.apache.hadoop.ozone.recon.tasks.
OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.mockito.Mockito.when;
/**
* Unit test for File Size Count Task.
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
@PrepareForTest(OmKeyInfo.class)
public class TestFileSizeCountTask {
@Test
public void testCalculateBinIndex() {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -28,14 +27,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
@ -50,16 +48,12 @@ import org.junit.Test;
public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
private ReconTaskController reconTaskController;
private Configuration sqlConfiguration;
@Before
public void setUp() throws Exception {
File omDbDir = temporaryFolder.newFolder();
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath());
ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl(
ozoneConfiguration);
sqlConfiguration = getInjector()
.getInstance(Configuration.class);
@ -69,7 +63,7 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
schemaDefinition.initializeSchema();
reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration,
omMetadataManager, sqlConfiguration);
sqlConfiguration, new HashSet<>());
}
@Test
@ -86,15 +80,17 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
@Test
public void testConsumeOMEvents() throws Exception {
ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
.EMPTY_LIST);
when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask");
ReconDBUpdateTask reconDBUpdateTaskMock = getMockTask("MockTask");
when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class)))
.thenReturn(new ImmutablePair<>("MockTask", true));
reconTaskController.registerTask(reconDBUpdateTaskMock);
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
when(omUpdateEventBatchMock.filter(Collections.singleton("MockTable")))
.thenReturn(omUpdateEventBatchMock);
reconTaskController.consumeOMEvents(
new OMUpdateEventBatch(Collections.emptyList()));
omUpdateEventBatchMock,
mock(OMMetadataManager.class));
verify(reconDBUpdateTaskMock, times(1))
.process(any());
@ -107,17 +103,13 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE);
reconTaskController.registerTask(dummyReconDBTask);
long currentTime = System.nanoTime();
OMDBUpdateEvent.EventInfo eventInfoMock = mock(
OMDBUpdateEvent.EventInfo.class);
when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
long currentTime = System.currentTimeMillis();
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
mock(OMMetadataManager.class));
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()));
@ -126,8 +118,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
ReconTaskStatus dbRecord = dao.findById(taskName);
Assert.assertEquals(taskName, dbRecord.getTaskName());
Assert.assertEquals(Long.valueOf(currentTime),
dbRecord.getLastUpdatedTimestamp());
Assert.assertTrue(
dbRecord.getLastUpdatedTimestamp() > currentTime);
Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber());
}
@ -138,18 +130,14 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL);
reconTaskController.registerTask(dummyReconDBTask);
long currentTime = System.nanoTime();
OMDBUpdateEvent.EventInfo eventInfoMock =
mock(OMDBUpdateEvent.EventInfo.class);
when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
for (int i = 0; i < 2; i++) {
reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
omMetadataManagerMock);
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
@ -157,8 +145,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
}
//Should be blacklisted now.
reconTaskController.consumeOMEvents(
new OMUpdateEventBatch(Collections.emptyList()));
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
omMetadataManagerMock);
assertTrue(reconTaskController.getRegisteredTasks().isEmpty());
ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration);
@ -168,4 +156,36 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp());
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber());
}
@Test
public void testReInitializeTasks() throws Exception {
OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
ReconDBUpdateTask reconDBUpdateTaskMock =
getMockTask("MockTask2");
when(reconDBUpdateTaskMock.reprocess(omMetadataManagerMock))
.thenReturn(new ImmutablePair<>("MockTask2", true));
reconTaskController.registerTask(reconDBUpdateTaskMock);
reconTaskController.reInitializeTasks(omMetadataManagerMock);
verify(reconDBUpdateTaskMock, times(1))
.reprocess(omMetadataManagerMock);
}
/**
* Helper method for getting a mocked Task.
* @param taskName name of the task.
* @return instance of ReconDBUpdateTask.
*/
private ReconDBUpdateTask getMockTask(String taskName) {
ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
.EMPTY_LIST);
when(reconDBUpdateTaskMock.getTaskName()).thenReturn(taskName);
when(reconDBUpdateTaskMock.getTaskTables())
.thenReturn(Collections.singleton("MockTable"));
return reconDBUpdateTaskMock;
}
}