HDFS-8070. Pre-HDFS-7915 DFSClient cannot use short circuit on post-HDFS-7915 DataNode (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-04-23 19:03:44 -07:00
parent a0e0a63209
commit a8898445dc
4 changed files with 54 additions and 2 deletions

View File

@ -589,6 +589,9 @@ Release 2.7.1 - UNRELEASED
HDFS-8147. StorageGroup in Dispatcher should override equals nad hashCode. HDFS-8147. StorageGroup in Dispatcher should override equals nad hashCode.
(surendra singh lilhore via szetszwo) (surendra singh lilhore via szetszwo)
HDFS-8070. Pre-HDFS-7915 DFSClient cannot use short circuit on
post-HDFS-7915 DataNode (cmccabe)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -77,6 +77,9 @@ public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException { public void injectRequestFileDescriptorsFailure() throws IOException {
// do nothing // do nothing
} }
public boolean getSupportsReceiptVerification() {
return true;
}
} }
@VisibleForTesting @VisibleForTesting
@ -533,7 +536,8 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId(); SlotId slotId = slot == null ? null : slot.getSlotId();
new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true); new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
failureInjector.getSupportsReceiptVerification());
DataInputStream in = new DataInputStream(peer.getInputStream()); DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in)); PBHelper.vintPrefixed(in));

View File

@ -186,7 +186,8 @@ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
try { try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()), PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion(), true); slotId, proto.getMaxVersion(),
proto.getSupportsReceiptVerification());
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
} }

View File

@ -743,4 +743,48 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
} }
public static class TestPreReceiptVerificationFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
public boolean getSupportsReceiptVerification() {
return false;
}
}
// Regression test for HDFS-8070
@Test(timeout=60000)
public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testPreReceiptVerificationDfsClientCanDoScr", sockDir);
conf.setLong(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.getClient().getConf().getShortCircuitConf().brfFailureInjector =
new TestPreReceiptVerificationFailureInjector();
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2);
final Path TEST_PATH2 = new Path("/test_file2");
DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
ShortCircuitRegistry registry =
cluster.getDataNodes().get(0).getShortCircuitRegistry();
registry.visit(new ShortCircuitRegistry.Visitor() {
@Override
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(2, slots.size());
}
});
cluster.shutdown();
sockDir.close();
}
} }