HDDS-1085. Create an OM API to serve snapshots to Recon server.
Contributed by Aravindan Vijayan.
This commit is contained in:
parent
67af509097
commit
588b4c4d78
@ -114,10 +114,13 @@ public final class OzoneConsts {
|
||||
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
|
||||
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
|
||||
public static final String OM_DB_NAME = "om.db";
|
||||
public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints";
|
||||
public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
|
||||
public static final String SCM_DB_NAME = "scm.db";
|
||||
|
||||
public static final String STORAGE_DIR_CHUNKS = "chunks";
|
||||
public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
|
||||
"flushBeforeCheckpoint";
|
||||
|
||||
/**
|
||||
* Supports Bucket Versioning.
|
||||
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
/**
|
||||
* Generic DB Checkpoint interface.
|
||||
*/
|
||||
public interface DBCheckpointSnapshot {
|
||||
|
||||
/**
|
||||
* Get Snapshot location.
|
||||
*/
|
||||
Path getCheckpointLocation();
|
||||
|
||||
/**
|
||||
* Get Snapshot creation timestamp.
|
||||
*/
|
||||
long getCheckpointTimestamp();
|
||||
|
||||
/**
|
||||
* Get last sequence number of Snapshot.
|
||||
*/
|
||||
long getLatestSequenceNumber();
|
||||
|
||||
/**
|
||||
* Destroy the contents of the specified checkpoint to ensure
|
||||
* proper cleanup of the footprint on disk.
|
||||
*
|
||||
* @throws IOException if I/O error happens
|
||||
*/
|
||||
void cleanupCheckpoint() throws IOException;
|
||||
|
||||
}
|
@ -137,5 +137,12 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
|
||||
*/
|
||||
void commitBatchOperation(BatchOperation operation) throws IOException;
|
||||
|
||||
/**
|
||||
* Get current snapshot of OM DB store as an artifact stored on
|
||||
* the local filesystem.
|
||||
* @return An object that encapsulates the checkpoint information along with
|
||||
* location.
|
||||
*/
|
||||
DBCheckpointSnapshot getCheckpointSnapshot(boolean flush) throws IOException;
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.rocksdb.Checkpoint;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* RocksDB Checkpoint Manager, used to create and cleanup checkpoints.
|
||||
*/
|
||||
public class RDBCheckpointManager {
|
||||
|
||||
private final Checkpoint checkpoint;
|
||||
private final RocksDB db;
|
||||
public static final String RDB_CHECKPOINT_DIR_PREFIX = "rdb_checkpoint_";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RDBCheckpointManager.class);
|
||||
public static final String JAVA_TMP_DIR = "java.io.tmpdir";
|
||||
private String checkpointNamePrefix = "";
|
||||
|
||||
public RDBCheckpointManager(RocksDB rocksDB) {
|
||||
this.db = rocksDB;
|
||||
this.checkpoint = Checkpoint.create(rocksDB);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a checkpoint manager with a prefix to be added to the
|
||||
* snapshots created.
|
||||
*
|
||||
* @param rocksDB DB instance
|
||||
* @param checkpointPrefix prefix string.
|
||||
*/
|
||||
public RDBCheckpointManager(RocksDB rocksDB, String checkpointPrefix) {
|
||||
this.db = rocksDB;
|
||||
this.checkpointNamePrefix = checkpointPrefix;
|
||||
this.checkpoint = Checkpoint.create(rocksDB);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create RocksDB snapshot by saving a checkpoint to a directory.
|
||||
*
|
||||
* @param parentDir The directory where the checkpoint needs to be created.
|
||||
* @return RocksDB specific Checkpoint information object.
|
||||
*/
|
||||
public RocksDBCheckpointSnapshot createCheckpointSnapshot(String parentDir)
|
||||
throws IOException {
|
||||
try {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
||||
String checkpointDir = StringUtils.EMPTY;
|
||||
if (StringUtils.isNotEmpty(checkpointNamePrefix)) {
|
||||
checkpointDir += checkpointNamePrefix;
|
||||
}
|
||||
checkpointDir += "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;
|
||||
|
||||
Path checkpointPath = Paths.get(parentDir, checkpointDir);
|
||||
checkpoint.createCheckpoint(checkpointPath.toString());
|
||||
|
||||
return new RocksDBCheckpointSnapshot(
|
||||
checkpointPath,
|
||||
currentTime,
|
||||
db.getLatestSequenceNumber()); //Best guesstimate here. Not accurate.
|
||||
|
||||
} catch (RocksDBException e) {
|
||||
LOG.error("Unable to create RocksDB Snapshot.", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
class RocksDBCheckpointSnapshot implements DBCheckpointSnapshot {
|
||||
|
||||
private Path checkpointLocation;
|
||||
private long checkpointTimestamp;
|
||||
private long latestSequenceNumber;
|
||||
|
||||
RocksDBCheckpointSnapshot(Path checkpointLocation,
|
||||
long snapshotTimestamp,
|
||||
long latestSequenceNumber) {
|
||||
this.checkpointLocation = checkpointLocation;
|
||||
this.checkpointTimestamp = snapshotTimestamp;
|
||||
this.latestSequenceNumber = latestSequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getCheckpointLocation() {
|
||||
return this.checkpointLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCheckpointTimestamp() {
|
||||
return this.checkpointTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLatestSequenceNumber() {
|
||||
return this.latestSequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupCheckpoint() throws IOException {
|
||||
FileUtils.deleteDirectory(checkpointLocation.toFile());
|
||||
}
|
||||
}
|
||||
}
|
@ -19,9 +19,12 @@
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_CHECKPOINTS_DIR_NAME;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
@ -39,6 +42,7 @@
|
||||
import org.rocksdb.ColumnFamilyDescriptor;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.DBOptions;
|
||||
import org.rocksdb.FlushOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.WriteOptions;
|
||||
@ -58,6 +62,8 @@ public class RDBStore implements DBStore {
|
||||
private final CodecRegistry codecRegistry;
|
||||
private final Hashtable<String, ColumnFamilyHandle> handleTable;
|
||||
private ObjectName statMBeanName;
|
||||
private RDBCheckpointManager checkPointManager;
|
||||
private final String checkpointsParentDir;
|
||||
|
||||
@VisibleForTesting
|
||||
public RDBStore(File dbFile, DBOptions options,
|
||||
@ -108,6 +114,17 @@ public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
|
||||
}
|
||||
}
|
||||
|
||||
//create checkpoints directory if not exists.
|
||||
checkpointsParentDir = Paths.get(dbLocation.getParent(),
|
||||
OM_DB_CHECKPOINTS_DIR_NAME).toString();
|
||||
File checkpointsDir = new File(checkpointsParentDir);
|
||||
if (!checkpointsDir.exists()) {
|
||||
checkpointsDir.mkdir();
|
||||
}
|
||||
|
||||
//Initialize checkpoint manager
|
||||
checkPointManager = new RDBCheckpointManager(db, "om");
|
||||
|
||||
} catch (RocksDBException e) {
|
||||
throw toIOException(
|
||||
"Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e);
|
||||
@ -246,4 +263,20 @@ public ArrayList<Table> listTables() throws IOException {
|
||||
}
|
||||
return returnList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DBCheckpointSnapshot getCheckpointSnapshot(boolean flush)
|
||||
throws IOException {
|
||||
if (flush) {
|
||||
final FlushOptions flushOptions =
|
||||
new FlushOptions().setWaitForFlush(true);
|
||||
try {
|
||||
db.flush(flushOptions);
|
||||
} catch (RocksDBException e) {
|
||||
LOG.error("Unable to Flush RocksDB data before creating snapshot", e);
|
||||
}
|
||||
}
|
||||
return checkPointManager.createCheckpointSnapshot(checkpointsParentDir);
|
||||
}
|
||||
|
||||
}
|
@ -20,9 +20,11 @@
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -87,21 +89,26 @@ public void tearDown() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private void insertRandomData(RDBStore dbStore, int familyIndex)
|
||||
throws Exception {
|
||||
try (Table firstTable = dbStore.getTable(families.get(familyIndex))) {
|
||||
Assert.assertNotNull("Table cannot be null", firstTable);
|
||||
for (int x = 0; x < 100; x++) {
|
||||
byte[] key =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
firstTable.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compactDB() throws Exception {
|
||||
try (RDBStore newStore =
|
||||
new RDBStore(folder.newFolder(), options, configSet)) {
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
try (Table firstTable = newStore.getTable(families.get(1))) {
|
||||
Assert.assertNotNull("Table cannot be null", firstTable);
|
||||
for (int x = 0; x < 100; x++) {
|
||||
byte[] key =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
firstTable.put(key, value);
|
||||
}
|
||||
}
|
||||
insertRandomData(newStore, 1);
|
||||
// This test does not assert anything if there is any error this test
|
||||
// will throw and fail.
|
||||
newStore.compactDB();
|
||||
@ -171,29 +178,13 @@ public void getEstimatedKeyCount() throws Exception {
|
||||
try (RDBStore newStore =
|
||||
new RDBStore(folder.newFolder(), options, configSet)) {
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
|
||||
// Write 100 keys to the first table.
|
||||
try (Table firstTable = newStore.getTable(families.get(1))) {
|
||||
Assert.assertNotNull("Table cannot be null", firstTable);
|
||||
for (int x = 0; x < 100; x++) {
|
||||
byte[] key =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
firstTable.put(key, value);
|
||||
}
|
||||
}
|
||||
insertRandomData(newStore, 1);
|
||||
|
||||
// Write 100 keys to the secondTable table.
|
||||
try (Table secondTable = newStore.getTable(families.get(2))) {
|
||||
Assert.assertNotNull("Table cannot be null", secondTable);
|
||||
for (int x = 0; x < 100; x++) {
|
||||
byte[] key =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
secondTable.put(key, value);
|
||||
}
|
||||
}
|
||||
insertRandomData(newStore, 2);
|
||||
|
||||
// Let us make sure that our estimate is not off by 10%
|
||||
Assert.assertTrue(newStore.getEstimatedKeyCount() > 180
|
||||
|| newStore.getEstimatedKeyCount() < 220);
|
||||
@ -255,4 +246,47 @@ public void listTables() throws Exception {
|
||||
}
|
||||
Assert.assertEquals(0, count);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRocksDBCheckpoint() throws Exception {
|
||||
try (RDBStore newStore =
|
||||
new RDBStore(folder.newFolder(), options, configSet)) {
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
|
||||
insertRandomData(newStore, 1);
|
||||
DBCheckpointSnapshot checkpointSnapshot =
|
||||
newStore.getCheckpointSnapshot(true);
|
||||
Assert.assertNotNull(checkpointSnapshot);
|
||||
|
||||
RDBStore restoredStoreFromCheckPoint =
|
||||
new RDBStore(checkpointSnapshot.getCheckpointLocation().toFile(),
|
||||
options, configSet);
|
||||
|
||||
// Let us make sure that our estimate is not off by 10%
|
||||
Assert.assertTrue(
|
||||
restoredStoreFromCheckPoint.getEstimatedKeyCount() > 90
|
||||
|| restoredStoreFromCheckPoint.getEstimatedKeyCount() < 110);
|
||||
checkpointSnapshot.cleanupCheckpoint();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRocksDBCheckpointCleanup() throws Exception {
|
||||
try (RDBStore newStore =
|
||||
new RDBStore(folder.newFolder(), options, configSet)) {
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
|
||||
insertRandomData(newStore, 1);
|
||||
DBCheckpointSnapshot checkpointSnapshot =
|
||||
newStore.getCheckpointSnapshot(true);
|
||||
Assert.assertNotNull(checkpointSnapshot);
|
||||
|
||||
Assert.assertTrue(Files.exists(
|
||||
checkpointSnapshot.getCheckpointLocation()));
|
||||
checkpointSnapshot.cleanupCheckpoint();
|
||||
Assert.assertFalse(Files.exists(
|
||||
checkpointSnapshot.getCheckpointLocation()));
|
||||
}
|
||||
}
|
||||
}
|
@ -30,6 +30,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -18,16 +18,25 @@
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||
import org.apache.commons.compress.utils.IOUtils;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.ScmUtils;
|
||||
@ -287,4 +296,57 @@ public static Collection<String> emptyAsSingletonNull(Collection<String>
|
||||
return coll;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a source directory, create a tar.gz file from it.
|
||||
*
|
||||
* @param sourcePath the path to the directory to be archived.
|
||||
* @return tar.gz file
|
||||
* @throws IOException
|
||||
*/
|
||||
public static File createTarFile(Path sourcePath) throws IOException {
|
||||
TarArchiveOutputStream tarOs = null;
|
||||
try {
|
||||
String sourceDir = sourcePath.toString();
|
||||
String fileName = sourceDir.concat(".tar.gz");
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
|
||||
GZIPOutputStream gzipOutputStream =
|
||||
new GZIPOutputStream(new BufferedOutputStream(fileOutputStream));
|
||||
tarOs = new TarArchiveOutputStream(gzipOutputStream);
|
||||
File folder = new File(sourceDir);
|
||||
File[] filesInDir = folder.listFiles();
|
||||
for (File file : filesInDir) {
|
||||
addFilesToArchive(file.getName(), file, tarOs);
|
||||
}
|
||||
return new File(fileName);
|
||||
} finally {
|
||||
try {
|
||||
org.apache.hadoop.io.IOUtils.closeStream(tarOs);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered when closing " +
|
||||
"TAR file output stream: " + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void addFilesToArchive(String source, File file,
|
||||
TarArchiveOutputStream
|
||||
tarFileOutputStream)
|
||||
throws IOException {
|
||||
tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source));
|
||||
if (file.isFile()) {
|
||||
FileInputStream fileInputStream = new FileInputStream(file);
|
||||
BufferedInputStream bufferedInputStream =
|
||||
new BufferedInputStream(fileInputStream);
|
||||
IOUtils.copy(bufferedInputStream, tarFileOutputStream);
|
||||
tarFileOutputStream.closeArchiveEntry();
|
||||
fileInputStream.close();
|
||||
} else if (file.isDirectory()) {
|
||||
tarFileOutputStream.closeArchiveEntry();
|
||||
for (File cFile : file.listFiles()) {
|
||||
addFilesToArchive(cFile.getAbsolutePath(), cFile, tarFileOutputStream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -204,4 +204,10 @@ private OMConfigKeys() {
|
||||
"ozone.manager.delegation.token.max-lifetime";
|
||||
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
|
||||
7*24*60*60*1000; // 7 days
|
||||
|
||||
public static final String OZONE_DB_SNAPSHOT_TRANSFER_RATE_KEY =
|
||||
"ozone.manager.db.snapshot.transfer.bandwidthPerSec";
|
||||
public static final long OZONE_DB_SNAPSHOT_TRANSFER_RATE_DEFAULT =
|
||||
0; //no throttling
|
||||
|
||||
}
|
||||
|
@ -19,17 +19,23 @@
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -88,4 +94,42 @@ public void testNoOmDbDirConfigured() {
|
||||
thrown.expect(IllegalArgumentException.class);
|
||||
OmUtils.getOmDbDir(new OzoneConfiguration());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateTarFile() throws Exception {
|
||||
|
||||
File tempSnapshotDir = null;
|
||||
FileInputStream fis = null;
|
||||
FileOutputStream fos = null;
|
||||
File tarFile = null;
|
||||
|
||||
try {
|
||||
String testDirName = System.getProperty("java.io.tmpdir");
|
||||
if (!testDirName.endsWith("/")) {
|
||||
testDirName += "/";
|
||||
}
|
||||
testDirName += "TestCreateTarFile_Dir" + System.currentTimeMillis();
|
||||
tempSnapshotDir = new File(testDirName);
|
||||
tempSnapshotDir.mkdirs();
|
||||
|
||||
File file = new File(testDirName + "/temp1.txt");
|
||||
FileWriter writer = new FileWriter(file);
|
||||
writer.write("Test data 1");
|
||||
writer.close();
|
||||
|
||||
file = new File(testDirName + "/temp2.txt");
|
||||
writer = new FileWriter(file);
|
||||
writer.write("Test data 2");
|
||||
writer.close();
|
||||
|
||||
tarFile = OmUtils.createTarFile(Paths.get(testDirName));
|
||||
Assert.assertNotNull(tarFile);
|
||||
|
||||
} finally {
|
||||
IOUtils.closeStream(fis);
|
||||
IOUtils.closeStream(fos);
|
||||
FileUtils.deleteDirectory(tempSnapshotDir);
|
||||
FileUtils.deleteQuietly(tarFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,142 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.
|
||||
OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.DBCheckpointSnapshot;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Provides the current checkpoint Snapshot of the OM DB. (tar.gz)
|
||||
*/
|
||||
public class OMDbSnapshotServlet extends HttpServlet {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(OMDbSnapshotServlet.class);
|
||||
|
||||
private transient DBStore omDbStore;
|
||||
private DataTransferThrottler throttler = null;
|
||||
|
||||
@Override
|
||||
public void init() throws ServletException {
|
||||
|
||||
OzoneManager om = (OzoneManager) getServletContext()
|
||||
.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
|
||||
|
||||
if (om == null) {
|
||||
LOG.error("Unable to initialize OMDbSnapshotServlet. OM is null");
|
||||
return;
|
||||
}
|
||||
|
||||
omDbStore = om.getMetadataManager().getStore();
|
||||
OzoneConfiguration configuration = om.getConfiguration();
|
||||
long transferBandwidth = configuration.getLongBytes(
|
||||
OMConfigKeys.OZONE_DB_SNAPSHOT_TRANSFER_RATE_KEY,
|
||||
OMConfigKeys.OZONE_DB_SNAPSHOT_TRANSFER_RATE_DEFAULT);
|
||||
|
||||
if (transferBandwidth > 0) {
|
||||
throttler = new DataTransferThrottler(transferBandwidth);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a GET request for the Ozone Manager DB checkpoint snapshot.
|
||||
*
|
||||
* @param request The servlet request we are processing
|
||||
* @param response The servlet response we are creating
|
||||
*/
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request, HttpServletResponse response) {
|
||||
|
||||
LOG.info("Received request to obtain OM DB checkpoint snapshot");
|
||||
if (omDbStore == null) {
|
||||
LOG.error(
|
||||
"Unable to process metadata snapshot request. DB Store is null");
|
||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
FileInputStream checkpointFileInputStream = null;
|
||||
File checkPointTarFile = null;
|
||||
try {
|
||||
|
||||
boolean flush = false;
|
||||
String flushParam =
|
||||
request.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH);
|
||||
if (StringUtils.isNotEmpty(flushParam)) {
|
||||
flush = Boolean.valueOf(flushParam);
|
||||
}
|
||||
|
||||
DBCheckpointSnapshot checkpoint = omDbStore.getCheckpointSnapshot(flush);
|
||||
if (checkpoint == null) {
|
||||
LOG.error("Unable to process metadata snapshot request. " +
|
||||
"Checkpoint request returned null.");
|
||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
LOG.info("Tar location = " + checkPointTarFile.getAbsolutePath());
|
||||
checkPointTarFile = OmUtils.createTarFile(
|
||||
checkpoint.getCheckpointLocation());
|
||||
LOG.info("Tar location = " + checkPointTarFile.getAbsolutePath());
|
||||
response.setContentType("application/x-tgz");
|
||||
response.setHeader("Content-Disposition",
|
||||
"attachment; filename=\"" +
|
||||
checkPointTarFile.getName() + "\"");
|
||||
|
||||
checkpointFileInputStream = new FileInputStream(checkPointTarFile);
|
||||
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
||||
checkPointTarFile,
|
||||
checkpointFileInputStream,
|
||||
throttler);
|
||||
|
||||
checkpoint.cleanupCheckpoint();
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Unable to process metadata snapshot request. ", e);
|
||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
} finally {
|
||||
if (checkPointTarFile != null) {
|
||||
FileUtils.deleteQuietly(checkPointTarFile);
|
||||
}
|
||||
IOUtils.closeStream(checkpointFileInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -2428,6 +2428,10 @@ public static Logger getLogger() {
|
||||
return LOG;
|
||||
}
|
||||
|
||||
public OzoneConfiguration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
|
||||
public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
|
||||
OzoneManager.testSecureOmFlag = testSecureOmFlag;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ public OzoneManagerHttpServer(Configuration conf, OzoneManager om)
|
||||
throws IOException {
|
||||
super(conf, "ozoneManager");
|
||||
addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
|
||||
addServlet("dbSnapshot", "/dbSnapshot", OMDbSnapshotServlet.class);
|
||||
getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user