From 7bebad61d9c3dbff81fdcf243585fd3e9ae59dde Mon Sep 17 00:00:00 2001 From: Stephen O'Donnell Date: Thu, 29 Aug 2019 17:37:05 -0700 Subject: [PATCH] HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell. Signed-off-by: Wei-Chiu Chuang --- .../apache/hadoop/hdfs/DFSOutputStream.java | 72 ++++++++++++++++++- .../hdfs/client/HdfsClientConfigKeys.java | 3 + .../hadoop/hdfs/TestDFSOutputStream.java | 37 ++++++++++ 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4a0d75e260..b7f2ff9d2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.EnumSet; +import java.util.Random; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; private int writePacketSize; + private boolean leaseRecovered = false; + private boolean exceptionInClose = false; //for unit test /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -836,6 +839,39 @@ protected void closeThreads(boolean force) throws IOException { } } + @VisibleForTesting + public void setExceptionInClose(boolean enable) { + exceptionInClose = enable; + } + + private class EmulateExceptionInClose { + private Random rand = null; + private int kickedNum; + + EmulateExceptionInClose(int callNum) { + if (exceptionInClose) { + rand = new Random(); + } + kickedNum = callNum; + } + + void kickRandomException() throws IOException { + if (exceptionInClose) { + if (kickedNum > 0) { + if (rand.nextInt(kickedNum) == 1) { + throw new IOException("Emulated random IOException in close"); + } + } + } + } + + void kickException() throws IOException { + if (exceptionInClose) { + throw new IOException("Emulated IOException in close"); + } + } + } + /** * Closes this output stream and releases any system * resources associated with this stream. @@ -858,7 +894,20 @@ public void close() throws IOException { } protected synchronized void closeImpl() throws IOException { + boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean( + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT); if (isClosed()) { + if (recoverOnCloseException && !leaseRecovered) { + try { + dfsClient.endFileLease(fileId); + dfsClient.recoverLease(src); + leaseRecovered = true; + } catch (Exception e) { + LOG.warn("Fail to recover lease for {}", src, e); + } + } + LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]", closed, getStreamer().streamerClosed()); try { @@ -875,8 +924,11 @@ protected synchronized void closeImpl() throws IOException { return; } + EmulateExceptionInClose eei = new EmulateExceptionInClose(5); try { - flushBuffer(); // flush from all upper layers + flushBuffer(); // flush from all upper layers + // for test + eei.kickRandomException(); if (currentPacket != null) { enqueueCurrentPacket(); @@ -887,12 +939,28 @@ protected synchronized void closeImpl() throws IOException { } try { - flushInternal(); // flush all data to Datanodes + flushInternal(); // flush all data to Datanodes } catch (IOException ioe) { cleanupAndRethrowIOException(ioe); } + // for test + eei.kickRandomException(); completeFile(); + // for test + eei.kickException(); } catch (ClosedChannelException ignored) { + } catch (IOException ioe) { + if (recoverOnCloseException) { + try { + dfsClient.endFileLease(fileId); + dfsClient.recoverLease(src); + leaseRecovered = true; + } catch (Exception e) { + // Ignore exception rendered by recoverLease. Throw original + // exception + } + } + throw ioe; } finally { // Failures may happen when flushing data. // Streamers may keep waiting for the new block information. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 97a8472b53..2e2e4a6df4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -307,6 +307,9 @@ interface Write { String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY = PREFIX + "exclude.nodes.cache.expiry.interval.millis"; long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE; + String RECOVER_ON_CLOSE_EXCEPTION_KEY = + PREFIX + "recover.on.close.exception"; + boolean RECOVER_ON_CLOSE_EXCEPTION_DEFAULT = false; interface ByteArrayManager { String PREFIX = Write.PREFIX + "byte-array-manager."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 1891956dc0..57510a6acb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Random; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -371,6 +373,41 @@ public void testStreamFlush() throws Exception { os.close(); } + /** + * If dfs.client.recover-on-close-exception.enable is set and exception + * happens in close, the local lease should be closed and lease in namenode + * should be recovered. + */ + @Test + public void testExceptionInClose() throws Exception { + String testStr = "Test exception in close"; + DistributedFileSystem fs = cluster.getFileSystem(); + Path testFile = new Path("/closeexception"); + fs.getConf().setBoolean( + HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, true); + FSDataOutputStream os = fs.create(testFile); + DFSOutputStream dos = + (DFSOutputStream) FieldUtils.readField(os, "wrappedStream", true); + dos.setExceptionInClose(true); + os.write(testStr.getBytes()); + try { + dos.close(); + // There should be exception + Assert.assertTrue(false); + } catch (IOException ioe) { + GenericTestUtils.waitFor(() -> { + boolean closed; + try { + closed = fs.isFileClosed(testFile); + } catch (IOException e) { + return false; + } + return closed; + }, 1000, 5000); + Assert.assertTrue(fs.isFileClosed(testFile)); + } + } + @AfterClass public static void tearDown() { if (cluster != null) {