HDFS-16847: RBF: Prevents StateStoreFileSystemImpl from committing tmp file after encountering an IOException. (#5145)

This commit is contained in:
Simbarashe Dzinamarira 2022-11-28 16:47:01 -08:00 committed by GitHub
parent f93167e678
commit ec2856d79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 26 deletions

View File

@ -85,7 +85,8 @@ protected abstract <T extends BaseRecord> BufferedReader getReader(
* @param path Path of the record to write. * @param path Path of the record to write.
* @return Writer for the record. * @return Writer for the record.
*/ */
protected abstract <T extends BaseRecord> BufferedWriter getWriter( @VisibleForTesting
public abstract <T extends BaseRecord> BufferedWriter getWriter(
String path); String path);
/** /**
@ -348,25 +349,18 @@ public <T extends BaseRecord> boolean putAll(
for (Entry<String, T> entry : toWrite.entrySet()) { for (Entry<String, T> entry : toWrite.entrySet()) {
String recordPath = entry.getKey(); String recordPath = entry.getKey();
String recordPathTemp = recordPath + "." + now() + TMP_MARK; String recordPathTemp = recordPath + "." + now() + TMP_MARK;
BufferedWriter writer = getWriter(recordPathTemp); boolean recordWrittenSuccessfully = true;
try { try (BufferedWriter writer = getWriter(recordPathTemp)) {
T record = entry.getValue(); T record = entry.getValue();
String line = serializeString(record); String line = serializeString(record);
writer.write(line); writer.write(line);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot write {}", recordPathTemp, e); LOG.error("Cannot write {}", recordPathTemp, e);
recordWrittenSuccessfully = false;
success = false; success = false;
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.error("Cannot close the writer for {}", recordPathTemp, e);
}
}
} }
// Commit // Commit
if (!rename(recordPathTemp, recordPath)) { if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
LOG.error("Failed committing record into {}", recordPath); LOG.error("Failed committing record into {}", recordPath);
success = false; success = false;
} }

View File

@ -31,6 +31,7 @@
import java.util.List; import java.util.List;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -125,7 +126,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String filename) {
} }
@Override @Override
protected <T extends BaseRecord> BufferedWriter getWriter(String filename) { @VisibleForTesting
public <T extends BaseRecord> BufferedWriter getWriter(String filename) {
BufferedWriter writer = null; BufferedWriter writer = null;
try { try {
LOG.debug("Writing file: {}", filename); LOG.debug("Writing file: {}", filename);

View File

@ -28,13 +28,14 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@ -82,17 +83,8 @@ protected boolean mkdir(String path) {
@Override @Override
protected boolean rename(String src, String dst) { protected boolean rename(String src, String dst) {
try { try {
if (fs instanceof DistributedFileSystem) { FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE);
DistributedFileSystem dfs = (DistributedFileSystem)fs;
dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE);
return true; return true;
} else {
// Replace should be atomic but not available
if (fs.exists(new Path(dst))) {
fs.delete(new Path(dst), true);
}
return fs.rename(new Path(src), new Path(dst));
}
} catch (Exception e) { } catch (Exception e) {
LOG.error("Cannot rename {} to {}", src, dst, e); LOG.error("Cannot rename {} to {}", src, dst, e);
return false; return false;
@ -148,7 +140,8 @@ protected <T extends BaseRecord> BufferedReader getReader(String pathName) {
} }
@Override @Override
protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) { @VisibleForTesting
public <T extends BaseRecord> BufferedWriter getWriter(String pathName) {
BufferedWriter writer = null; BufferedWriter writer = null;
Path path = new Path(pathName); Path path = new Path(pathName);
try { try {

View File

@ -234,6 +234,25 @@ public <T extends BaseRecord> void testInsert(
assertEquals(11, records2.size()); assertEquals(11, records2.size());
} }
public <T extends BaseRecord> void testInsertWithErrorDuringWrite(
StateStoreDriver driver, Class<T> recordClass)
throws IllegalArgumentException, IllegalAccessException, IOException {
assertTrue(driver.removeAll(recordClass));
QueryResult<T> queryResult0 = driver.get(recordClass);
List<T> records0 = queryResult0.getRecords();
assertTrue(records0.isEmpty());
// Insert single
BaseRecord record = generateFakeRecord(recordClass);
driver.put(record, true, false);
// Verify that no record was inserted.
QueryResult<T> queryResult1 = driver.get(recordClass);
List<T> records1 = queryResult1.getRecords();
assertEquals(0, records1.size());
}
public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver, public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver,
Class<T> clazz) throws IllegalAccessException, IOException { Class<T> clazz) throws IllegalAccessException, IOException {

View File

@ -17,16 +17,26 @@
*/ */
package org.apache.hadoop.hdfs.server.federation.store.driver; package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
/** /**
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver. * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
@ -91,4 +101,18 @@ public void testMetrics()
throws IllegalArgumentException, IllegalAccessException, IOException { throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver()); testMetrics(getStateStoreDriver());
} }
@Test
public void testInsertWithErrorDuringWrite()
throws IllegalArgumentException, IllegalAccessException, IOException {
StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver());
doAnswer((Answer<BufferedWriter>) a -> {
BufferedWriter writer = (BufferedWriter) a.callRealMethod();
BufferedWriter spyWriter = spy(writer);
doThrow(IOException.class).when(spyWriter).write(any(String.class));
return spyWriter;
}).when(driver).getWriter(any());
testInsertWithErrorDuringWrite(driver, MembershipState.class);
}
} }