HDFS-13164. File not closed if streamer fail with DSQuotaExceededException.

This commit is contained in:
Xiao Chen 2018-02-23 13:47:39 -08:00
parent 8e728f39c9
commit 51088d3233
3 changed files with 163 additions and 9 deletions

View File

@ -852,7 +852,19 @@ public void close() throws IOException {
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
getStreamer().getLastException().check(true);
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
closed, getStreamer().streamerClosed());
try {
getStreamer().getLastException().check(true);
} catch (IOException ioe) {
cleanupAndRethrowIOException(ioe);
} finally {
if (!closed) {
// If stream is not closed but streamer closed, clean up the stream.
// Most importantly, end the file lease.
closeThreads(true);
}
}
return;
}
@ -867,14 +879,12 @@ protected synchronized void closeImpl() throws IOException {
setCurrentPacketToEmpty();
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
completeFile(lastBlock);
try {
flushInternal(); // flush all data to Datanodes
} catch (IOException ioe) {
cleanupAndRethrowIOException(ioe);
}
completeFile();
} catch (ClosedChannelException ignored) {
} finally {
// Failures may happen when flushing data.
@ -886,6 +896,43 @@ protected synchronized void closeImpl() throws IOException {
}
}
private void completeFile() throws IOException {
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
completeFile(lastBlock);
}
}
/**
* Determines whether an IOException thrown needs extra cleanup on the stream.
* Space quota exceptions will be thrown when getting new blocks, so the
* open HDFS file need to be closed.
*
* @param ioe the IOException
* @return whether the stream needs cleanup for the given IOException
*/
private boolean exceptionNeedsCleanup(IOException ioe) {
return ioe instanceof DSQuotaExceededException
|| ioe instanceof QuotaByStorageTypeExceededException;
}
private void cleanupAndRethrowIOException(IOException ioe)
throws IOException {
if (exceptionNeedsCleanup(ioe)) {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
b.add(ioe);
try {
completeFile();
} catch (IOException e) {
b.add(e);
throw b.build();
}
}
throw ioe;
}
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
protected void completeFile(ExtendedBlock last) throws IOException {

View File

@ -74,7 +74,7 @@
*/
@InterfaceAudience.Private
public class LeaseRenewer {
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

View File

@ -35,6 +35,7 @@
import java.util.List;
import java.util.Scanner;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -42,6 +43,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@ -58,13 +60,20 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.LoggerFactory;
/** A class for testing quota-related commands */
public class TestQuota {
private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class);
private static Configuration conf = null;
private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
@ -77,6 +86,9 @@ public class TestQuota {
/* set a smaller block size so that we can test with smaller space quotas */
private static final int DEFAULT_BLOCK_SIZE = 512;
@Rule
public final Timeout testTestout = new Timeout(120000);
@BeforeClass
public static void setUpClass() throws Exception {
conf = new HdfsConfiguration();
@ -1479,6 +1491,101 @@ public void testSetAndClearSpaceQuotaNoAccess() throws Exception {
"clrSpaceQuota");
}
@Test
public void testSpaceQuotaExceptionOnClose() throws Exception {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
final Path dir = new Path(PathUtils.getTestPath(getClass()),
GenericTestUtils.getMethodName());
assertTrue(dfs.mkdirs(dir));
final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
assertEquals(0, ToolRunner.run(dfsAdmin, args));
final Path testFile = new Path(dir, "file");
final FSDataOutputStream stream = dfs.create(testFile);
stream.write("whatever".getBytes());
try {
stream.close();
fail("close should fail");
} catch (DSQuotaExceededException expected) {
}
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
}
@Test
public void testSpaceQuotaExceptionOnFlush() throws Exception {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
final Path dir = new Path(PathUtils.getTestPath(getClass()),
GenericTestUtils.getMethodName());
assertTrue(dfs.mkdirs(dir));
final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
assertEquals(0, ToolRunner.run(dfsAdmin, args));
Path testFile = new Path(dir, "file");
FSDataOutputStream stream = dfs.create(testFile);
// get the lease renewer now so we can verify it later without calling
// getLeaseRenewer, which will automatically add the client into it.
final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer();
stream.write("whatever".getBytes());
try {
stream.hflush();
fail("flush should fail");
} catch (DSQuotaExceededException expected) {
}
// even if we close the stream in finially, it won't help.
try {
stream.close();
fail("close should fail too");
} catch (DSQuotaExceededException expected) {
}
GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("LeaseRenewer: {}", leaseRenewer);
return leaseRenewer.isEmpty();
}
}, 100, 10000);
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
}
@Test
public void testSpaceQuotaExceptionOnAppend() throws Exception {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
final DFSAdmin dfsAdmin = new DFSAdmin(conf);
final Path dir = new Path(PathUtils.getTestPath(getClass()),
GenericTestUtils.getMethodName());
dfs.delete(dir, true);
assertTrue(dfs.mkdirs(dir));
final String[] args =
new String[] {"-setSpaceQuota", "4000", dir.toString()};
ToolRunner.run(dfsAdmin, args);
final Path testFile = new Path(dir, "file");
OutputStream stream = dfs.create(testFile);
stream.write("whatever".getBytes());
stream.close();
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
stream = dfs.append(testFile);
byte[] buf = AppendTestUtil.initBuffer(4096);
stream.write(buf);
try {
stream.close();
fail("close after append should fail");
} catch (DSQuotaExceededException expected) {
}
assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
}
private void testSetAndClearSpaceQuotaNoAccessInternal(
final String[] args,
final int cmdRet,