From 16c6755554cc5ecd9d4e0ba74b75b10c74bb0ab4 Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Thu, 7 Nov 2013 18:02:37 +0000 Subject: [PATCH] HDFS-5252. Stable write is not handled correctly in someplace. Contributed by Brandon Li git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539740 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/nfs/nfs3/request/READ3Request.java | 17 +++ .../hadoop/hdfs/nfs/nfs3/Nfs3Utils.java | 6 + .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 17 +++ .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 7 + .../hadoop/hdfs/nfs/nfs3/WriteManager.java | 6 + .../hadoop/hdfs/nfs/nfs3/TestWrites.java | 135 +++++++++++++++++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + 7 files changed, 189 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/READ3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/READ3Request.java index c676d8b5ea..6d95f5e9f8 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/READ3Request.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/READ3Request.java @@ -19,8 +19,11 @@ import java.io.IOException; +import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.oncrpc.XDR; +import com.google.common.annotations.VisibleForTesting; + /** * READ3 Request */ @@ -34,6 +37,13 @@ public READ3Request(XDR xdr) throws IOException { count = xdr.readInt(); } + @VisibleForTesting + public READ3Request(FileHandle handle, long offset, int count) { + super(handle); + this.offset = offset; + this.count = count; + } + public long getOffset() { return this.offset; } @@ -41,4 +51,11 @@ public long getOffset() { public int getCount() { return this.count; } + + @Override + public void serialize(XDR xdr) { + handle.serialize(xdr); + xdr.writeLongAsHyper(offset); + xdr.writeInt(count); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index a8def77731..f3599a5beb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -109,6 +109,12 @@ public static WccData createWccData(final WccAttr preOpAttr, * Send a write response to the netty network socket channel */ public static void writeChannel(Channel channel, XDR out, int xid) { + if (channel == null) { + RpcProgramNfs3.LOG + .info("Null channel should only happen in tests. Do nothing."); + return; + } + if (RpcProgramNfs3.LOG.isDebugEnabled()) { RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 159783598f..a6fb97bac5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -1007,6 +1007,23 @@ private void doSingleWrite(final WriteCtx writeCtx) { } if (!writeCtx.getReplied()) { + if (stableHow != WriteStableHow.UNSTABLE) { + LOG.info("Do sync for stable write:" + writeCtx); + try { + if (stableHow == WriteStableHow.DATA_SYNC) { + fos.hsync(); + } else { + Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC, + "Unknown WriteStableHow:" + stableHow); + // Sync file data and length + fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + } + } catch (IOException e) { + LOG.error("hsync failed with writeCtx:" + writeCtx + " error:" + e); + throw e; + } + } + WccAttr preOpAttr = latestAttr.getWccAttr(); WccData fileWcc = new WccData(preOpAttr, latestAttr); if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 612171e084..7ee9f06766 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -126,6 +126,8 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; +import com.google.common.annotations.VisibleForTesting; + /** * RPC program corresponding to nfs daemon. See {@link Nfs3}. */ @@ -1975,4 +1977,9 @@ private boolean checkAccessPrivilege(final InetAddress client, } return true; } + + @VisibleForTesting + WriteManager getWriteManager() { + return this.writeManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 0989c82c42..cb8bf86569 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -45,6 +45,7 @@ import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; /** @@ -262,6 +263,11 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle, } return attr; } + + @VisibleForTesting + ConcurrentMap getOpenFileMap() { + return this.openFileMap; + } /** * StreamMonitor wakes up periodically to find and closes idle streams. diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 3626728bdc..30b9395959 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -17,21 +17,41 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import junit.framework.Assert; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; +import org.apache.hadoop.nfs.nfs3.request.CREATE3Request; +import org.apache.hadoop.nfs.nfs3.request.READ3Request; +import org.apache.hadoop.nfs.nfs3.request.SetAttr3; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.CREATE3Response; +import org.apache.hadoop.nfs.nfs3.response.READ3Response; +import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.junit.Test; import org.mockito.Mockito; @@ -105,7 +125,7 @@ public void testAlterWriteRequest() throws IOException { Assert.assertTrue(limit - position == 1); Assert.assertTrue(appendedData.get(position) == (byte) 19); } - + @Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, @@ -162,4 +182,117 @@ public void testCheckCommit() throws IOException { ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); } + + private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) + throws InterruptedException { + int waitedTime = 0; + ConcurrentMap openFileMap = nfsd.getWriteManager() + .getOpenFileMap(); + OpenFileCtx ctx = openFileMap.get(handle); + assertTrue(ctx != null); + do { + Thread.sleep(3000); + waitedTime += 3000; + if (ctx.getPendingWritesForTest().size() == 0) { + return; + } + } while (waitedTime < maxWaitTime); + + fail("Write can't finish."); + } + + @Test + public void testWriteStableHow() throws IOException, InterruptedException { + HdfsConfiguration config = new HdfsConfiguration(); + DFSClient client = null; + MiniDFSCluster cluster = null; + RpcProgramNfs3 nfsd; + SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class); + Mockito.when(securityHandler.getUser()).thenReturn( + System.getProperty("user.name")); + + try { + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + client = new DFSClient(NameNode.getAddress(config), config); + + // Start nfs + List exports = new ArrayList(); + exports.add("/"); + Nfs3 nfs3 = new Nfs3(exports, config); + nfs3.start(false); + nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); + + HdfsFileStatus status = client.getFileInfo("/"); + FileHandle rootHandle = new FileHandle(status.getFileId()); + // Create file1 + CREATE3Request createReq = new CREATE3Request(rootHandle, "file1", + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); + XDR createXdr = new XDR(); + createReq.serialize(createXdr); + CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(), + securityHandler, InetAddress.getLocalHost()); + FileHandle handle = createRsp.getObjHandle(); + + // Test DATA_SYNC + byte[] buffer = new byte[10]; + for (int i = 0; i < 10; i++) { + buffer[i] = (byte) i; + } + WRITE3Request writeReq = new WRITE3Request(handle, 0, 10, + WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); + XDR writeXdr = new XDR(); + writeReq.serialize(writeXdr); + nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler, + InetAddress.getLocalHost()); + + waitWrite(nfsd, handle, 60000); + + // Readback + READ3Request readReq = new READ3Request(handle, 0, 10); + XDR readXdr = new XDR(); + readReq.serialize(readXdr); + READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(), + securityHandler, InetAddress.getLocalHost()); + + assertTrue(Arrays.equals(buffer, readRsp.getData().array())); + + // Test FILE_SYNC + + // Create file2 + CREATE3Request createReq2 = new CREATE3Request(rootHandle, "file2", + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); + XDR createXdr2 = new XDR(); + createReq2.serialize(createXdr2); + CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(), + securityHandler, InetAddress.getLocalHost()); + FileHandle handle2 = createRsp2.getObjHandle(); + + WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10, + WriteStableHow.FILE_SYNC, ByteBuffer.wrap(buffer)); + XDR writeXdr2 = new XDR(); + writeReq2.serialize(writeXdr2); + nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler, + InetAddress.getLocalHost()); + + waitWrite(nfsd, handle2, 60000); + + // Readback + READ3Request readReq2 = new READ3Request(handle2, 0, 10); + XDR readXdr2 = new XDR(); + readReq2.serialize(readXdr2); + READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(), + securityHandler, InetAddress.getLocalHost()); + + assertTrue(Arrays.equals(buffer, readRsp2.getData().array())); + // FILE_SYNC should sync the file size + status = client.getFileInfo("/file2"); + assertTrue(status.getLen() == 10); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9fedc0ed02..614ed40fed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -588,6 +588,8 @@ Release 2.2.1 - UNRELEASED HDFS-5458. Datanode failed volume threshold ignored if exception is thrown in getDataDirsFromURIs. (Mike Mellenthin via wang) + HDFS-5252. Stable write is not handled correctly in someplace. (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES