HDDS-1981: Datanode should sync db when container is moved to CLOSED or QUASI_CLOSED state (#1319)

This commit is contained in:
Lokesh Jain 2019-08-27 10:22:03 +05:30 committed by GitHub
parent 567091aa9b
commit 4379370fb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 59 additions and 4 deletions

View File

@ -240,6 +240,12 @@ public void compactDB() throws IOException {
} }
} }
@Override
public void flushDB(boolean sync) {
// TODO: Implement flush for level db
// do nothing
}
@Override @Override
public void writeBatch(BatchOperation operation) throws IOException { public void writeBatch(BatchOperation operation) throws IOException {
List<BatchOperation.SingleOperation> operations = List<BatchOperation.SingleOperation> operations =

View File

@ -131,6 +131,12 @@ List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
*/ */
void compactDB() throws IOException; void compactDB() throws IOException;
/**
* Flush the outstanding I/O operations of the DB.
* @param sync if true will sync the outstanding I/Os to the disk.
*/
void flushDB(boolean sync) throws IOException;
/** /**
* Destroy the content of the specified database, * Destroy the content of the specified database,
* a destroyed database will not be able to load again. * a destroyed database will not be able to load again.

View File

@ -279,6 +279,19 @@ public void compactDB() throws IOException {
} }
} }
@Override
public void flushDB(boolean sync) throws IOException {
if (db != null) {
try {
// for RocksDB it is sufficient to flush the WAL as entire db can
// be reconstructed using it.
db.flushWal(sync);
} catch (RocksDBException e) {
throw toIOException("Failed to flush db", e);
}
}
}
private void deleteQuietly(File fileOrDir) { private void deleteQuietly(File fileOrDir) {
if (fileOrDir != null && fileOrDir.exists()) { if (fileOrDir != null && fileOrDir.exists()) {
try { try {

View File

@ -144,6 +144,7 @@ enum Result {
CONTAINER_NOT_OPEN = 39; CONTAINER_NOT_OPEN = 39;
CONTAINER_MISSING = 40; CONTAINER_MISSING = 40;
BLOCK_TOKEN_VERIFICATION_FAILED = 41; BLOCK_TOKEN_VERIFICATION_FAILED = 41;
ERROR_IN_DB_SYNC = 42;
} }
/** /**

View File

@ -68,6 +68,8 @@
.Result.DISK_OUT_OF_SPACE; .Result.DISK_OUT_OF_SPACE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.ERROR_IN_COMPACT_DB; .Result.ERROR_IN_COMPACT_DB;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.ERROR_IN_DB_SYNC;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE; .Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -298,8 +300,14 @@ public void markContainerUnhealthy() throws StorageContainerException {
@Override @Override
public void quasiClose() throws StorageContainerException { public void quasiClose() throws StorageContainerException {
// The DB must be synced during close operation
flushAndSyncDB();
writeLock(); writeLock();
try { try {
// Second sync should be a very light operation as sync has already
// been done outside the lock.
flushAndSyncDB();
updateContainerData(containerData::quasiCloseContainer); updateContainerData(containerData::quasiCloseContainer);
} finally { } finally {
writeUnlock(); writeUnlock();
@ -308,16 +316,18 @@ public void quasiClose() throws StorageContainerException {
@Override @Override
public void close() throws StorageContainerException { public void close() throws StorageContainerException {
// The DB must be synced during close operation
flushAndSyncDB();
writeLock(); writeLock();
try { try {
// Second sync should be a very light operation as sync has already
// been done outside the lock.
flushAndSyncDB();
updateContainerData(containerData::closeContainer); updateContainerData(containerData::closeContainer);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
compactDB();
} }
/** /**
@ -365,6 +375,22 @@ void compactDB() throws StorageContainerException {
} }
} }
private void flushAndSyncDB() throws StorageContainerException {
try {
try (ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
db.getStore().flushDB(true);
LOG.info("Container {} is synced with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
}
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {
LOG.error("Error in DB sync while closing container", ex);
throw new StorageContainerException(ex, ERROR_IN_DB_SYNC);
}
}
@Override @Override
public KeyValueContainerData getContainerData() { public KeyValueContainerData getContainerData() {
return containerData; return containerData;

View File

@ -155,6 +155,9 @@ public void testMarkClosedContainerAsUnhealthy() throws IOException {
*/ */
@Test @Test
public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException { public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException {
// We need to create the container so the sync-on-quasi-close operation
// does not NPE.
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.quasiClose(); keyValueContainer.quasiClose();
keyValueContainer.markContainerUnhealthy(); keyValueContainer.markContainerUnhealthy();
assertThat(keyValueContainerData.getState(), is(UNHEALTHY)); assertThat(keyValueContainerData.getState(), is(UNHEALTHY));