diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index 8352bca12e..1ed9f38474 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -85,7 +85,8 @@ protected abstract BufferedReader getReader( * @param path Path of the record to write. * @return Writer for the record. */ - protected abstract BufferedWriter getWriter( + @VisibleForTesting + public abstract BufferedWriter getWriter( String path); /** @@ -348,25 +349,18 @@ public boolean putAll( for (Entry entry : toWrite.entrySet()) { String recordPath = entry.getKey(); String recordPathTemp = recordPath + "." + now() + TMP_MARK; - BufferedWriter writer = getWriter(recordPathTemp); - try { + boolean recordWrittenSuccessfully = true; + try (BufferedWriter writer = getWriter(recordPathTemp)) { T record = entry.getValue(); String line = serializeString(record); writer.write(line); } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); + recordWrittenSuccessfully = false; success = false; - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - LOG.error("Cannot close the writer for {}", recordPathTemp, e); - } - } } // Commit - if (!rename(recordPathTemp, recordPath)) { + if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { LOG.error("Failed committing record into {}", recordPath); success = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java index 9d2b1ab2fb..6ca2663716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -31,6 +31,7 @@ import java.util.List; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.slf4j.Logger; @@ -125,7 +126,8 @@ protected BufferedReader getReader(String filename) { } @Override - protected BufferedWriter getWriter(String filename) { + @VisibleForTesting + public BufferedWriter getWriter(String filename) { BufferedWriter writer = null; try { LOG.debug("Writing file: {}", filename); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java index e6bf159e2f..ee34d8a4ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -28,13 +28,14 @@ import java.util.Collections; import java.util.List; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -82,17 +83,8 @@ protected boolean mkdir(String path) { @Override protected boolean rename(String src, String dst) { try { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE); - return true; - } else { - // Replace should be atomic but not available - if (fs.exists(new Path(dst))) { - fs.delete(new Path(dst), true); - } - return fs.rename(new Path(src), new Path(dst)); - } + FileUtil.rename(fs, new Path(src), new Path(dst), Options.Rename.OVERWRITE); + return true; } catch (Exception e) { LOG.error("Cannot rename {} to {}", src, dst, e); return false; @@ -148,7 +140,8 @@ protected BufferedReader getReader(String pathName) { } @Override - protected BufferedWriter getWriter(String pathName) { + @VisibleForTesting + public BufferedWriter getWriter(String pathName) { BufferedWriter writer = null; Path path = new Path(pathName); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index fe1b9a5bfa..06b05f45bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -234,6 +234,25 @@ public void testInsert( assertEquals(11, records2.size()); } + public void testInsertWithErrorDuringWrite( + StateStoreDriver driver, Class recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + assertTrue(driver.removeAll(recordClass)); + QueryResult queryResult0 = driver.get(recordClass); + List records0 = queryResult0.getRecords(); + assertTrue(records0.isEmpty()); + + // Insert single + BaseRecord record = generateFakeRecord(recordClass); + driver.put(record, true, false); + + // Verify that no record was inserted. + QueryResult queryResult1 = driver.get(recordClass); + List records1 = queryResult1.getRecords(); + assertEquals(0, records1.size()); + } + public void testFetchErrors(StateStoreDriver driver, Class clazz) throws IllegalAccessException, IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java index 8c4b188cc4..dbd4b9bdae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -17,16 +17,26 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver; +import java.io.BufferedWriter; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + /** * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. @@ -91,4 +101,18 @@ public void testMetrics() throws IllegalArgumentException, IllegalAccessException, IOException { testMetrics(getStateStoreDriver()); } + + @Test + public void testInsertWithErrorDuringWrite() + throws IllegalArgumentException, IllegalAccessException, IOException { + StateStoreFileBaseImpl driver = spy((StateStoreFileBaseImpl)getStateStoreDriver()); + doAnswer((Answer) a -> { + BufferedWriter writer = (BufferedWriter) a.callRealMethod(); + BufferedWriter spyWriter = spy(writer); + doThrow(IOException.class).when(spyWriter).write(any(String.class)); + return spyWriter; + }).when(driver).getWriter(any()); + + testInsertWithErrorDuringWrite(driver, MembershipState.class); + } } \ No newline at end of file