HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.

Co-authored-by: Chen Zhang <chzhang1987@gmail.com>

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
He Xiaoqiao 2020-09-09 19:50:06 +08:00
parent 43572fc7f8
commit 1d6d0d8207
3 changed files with 107 additions and 1 deletions

View File

@ -75,6 +75,8 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
/**************************************************************** /****************************************************************
* DFSOutputStream creates files from a stream of bytes. * DFSOutputStream creates files from a stream of bytes.
@ -126,6 +128,7 @@ public class DFSOutputStream extends FSOutputSummer
protected final AtomicReference<CachingStrategy> cachingStrategy; protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo; private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize; private int writePacketSize;
private boolean leaseRecovered = false;
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt, protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@ -861,7 +864,14 @@ public void close() throws IOException {
} }
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
boolean recoverLeaseOnCloseException = dfsClient.getConfiguration()
.getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY,
RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT);
if (isClosed()) { if (isClosed()) {
if (!leaseRecovered) {
recoverLease(recoverLeaseOnCloseException);
}
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
closed, getStreamer().streamerClosed()); closed, getStreamer().streamerClosed());
try { try {
@ -896,6 +906,9 @@ protected synchronized void closeImpl() throws IOException {
} }
completeFile(); completeFile();
} catch (ClosedChannelException ignored) { } catch (ClosedChannelException ignored) {
} catch (IOException ioe) {
recoverLease(recoverLeaseOnCloseException);
throw ioe;
} finally { } finally {
// Failures may happen when flushing data. // Failures may happen when flushing data.
// Streamers may keep waiting for the new block information. // Streamers may keep waiting for the new block information.
@ -906,7 +919,23 @@ protected synchronized void closeImpl() throws IOException {
} }
} }
private void completeFile() throws IOException { /**
* If recoverLeaseOnCloseException is true and an exception occurs when
* closing a file, recover lease.
*/
private void recoverLease(boolean recoverLeaseOnCloseException) {
if (recoverLeaseOnCloseException) {
try {
dfsClient.endFileLease(fileId);
dfsClient.recoverLease(src);
leaseRecovered = true;
} catch (Exception e) {
LOG.warn("Fail to recover lease for {}", src, e);
}
}
}
void completeFile() throws IOException {
// get last block before destroying the streamer // get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock(); ExtendedBlock lastBlock = getStreamer().getBlock();
try (TraceScope ignored = try (TraceScope ignored =
@ -1076,6 +1105,11 @@ public String toString() {
return getClass().getSimpleName() + ":" + streamer; return getClass().getSimpleName() + ":" + streamer;
} }
@VisibleForTesting
boolean isLeaseRecovered() {
return leaseRecovered;
}
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags) String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)

View File

@ -362,6 +362,9 @@ interface Write {
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
PREFIX + "exclude.nodes.cache.expiry.interval.millis"; PREFIX + "exclude.nodes.cache.expiry.interval.millis";
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE; long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
String RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY =
PREFIX + "recover.lease.on.close.exception";
boolean RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT = false;
interface ByteArrayManager { interface ByteArrayManager {
String PREFIX = Write.PREFIX + "byte-array-manager."; String PREFIX = Write.PREFIX + "byte-array-manager.";

View File

@ -30,6 +30,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -62,7 +63,10 @@
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -371,10 +375,75 @@ public void testStreamFlush() throws Exception {
os.close(); os.close();
} }
@Test
public void testExceptionInCloseWithRecoverLease() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream = spyClient.create(
"/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile();
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertTrue(spyDFSOutputStream.isLeaseRecovered());
waitForFileClosed("/testExceptionInCloseWithRecoverLease");
assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease"));
}
}
@Test
public void testExceptionInCloseWithoutRecoverLease() throws Exception {
Configuration conf = new Configuration();
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream =
spyClient.create("/testExceptionInCloseWithoutRecoverLease",
FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
(short) 3, 1024, null, 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile();
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertFalse(spyDFSOutputStream.isLeaseRecovered());
try {
waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
} catch (TimeoutException e) {
assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
}
}
}
@AfterClass @AfterClass
public static void tearDown() { public static void tearDown() {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
private boolean isFileClosed(String path) throws IOException {
return cluster.getFileSystem().isFileClosed(new Path(path));
}
private void waitForFileClosed(String path) throws Exception {
GenericTestUtils.waitFor(() -> {
boolean closed;
try {
closed = isFileClosed(path);
} catch (IOException e) {
return false;
}
return closed;
}, 1000, 5000);
}
} }