diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java index b021ea2b6c..946ab5aa8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; + +import org.apache.commons.io.input.BoundedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -98,7 +100,8 @@ public InputStream getDataInputStream(long seekOffset) throws IOException { if (remoteFS != null) { FSDataInputStream ins = remoteFS.open(new Path(fileURI)); ins.seek(fileOffset + seekOffset); - return new FSDataInputStream(ins); + return new BoundedInputStream( + new FSDataInputStream(ins), getBlockDataLength()); } else { throw new IOException("Remote filesystem for provided replica " + this + " does not exist"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java new file mode 100644 index 0000000000..8258c21cff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests the implementation of {@link ProvidedReplica}. + */ +public class TestProvidedReplicaImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(TestProvidedReplicaImpl.class); + private static final String BASE_DIR = + new FileSystemTestHelper().getTestRootDir(); + private static final String FILE_NAME = "provided-test"; + //length of the file that is associated with the provided blocks. + private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024; + //length of each provided block. + private static final long BLK_LEN = 128 * 1024L; + + private static List replicas; + + private static void createFileIfNotExists(String baseDir) throws IOException { + File newFile = new File(baseDir, FILE_NAME); + newFile.getParentFile().mkdirs(); + if(!newFile.exists()) { + newFile.createNewFile(); + OutputStream writer = new FileOutputStream(newFile.getAbsolutePath()); + //FILE_LEN is length in bytes. + byte[] bytes = new byte[1]; + bytes[0] = (byte) 0; + for(int i=0; i< FILE_LEN; i++) { + writer.write(bytes); + } + writer.flush(); + writer.close(); + LOG.info("Created provided file " + newFile + + " of length " + newFile.length()); + } + } + + private static void createProvidedReplicas(Configuration conf) { + long numReplicas = (long) Math.ceil((double) FILE_LEN/BLK_LEN); + File providedFile = new File(BASE_DIR, FILE_NAME); + replicas = new ArrayList(); + + LOG.info("Creating " + numReplicas + " provided replicas"); + for (int i=0; i= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN; + replicas.add( + new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN, + currentReplicaLength, 0, null, conf)); + } + } + + @Before + public void setUp() throws IOException { + createFileIfNotExists(new File(BASE_DIR).getAbsolutePath()); + createProvidedReplicas(new Configuration()); + } + + /** + * Checks if {@code ins} matches the provided file from offset + * {@code fileOffset} for length {@ dataLength}. + * @param file the local file + * @param ins input stream to compare against + * @param fileOffset offset + * @param dataLength length + * @throws IOException + */ + private void verifyReplicaContents(File file, + InputStream ins, long fileOffset, long dataLength) + throws IOException { + + InputStream fileIns = new FileInputStream(file); + fileIns.skip(fileOffset); + + try (ReadableByteChannel i = + Channels.newChannel(new BoundedInputStream(fileIns, dataLength))) { + try (ReadableByteChannel j = Channels.newChannel(ins)) { + ByteBuffer ib = ByteBuffer.allocate(4096); + ByteBuffer jb = ByteBuffer.allocate(4096); + while (true) { + int il = i.read(ib); + int jl = j.read(jb); + if (il < 0 || jl < 0) { + assertEquals(il, jl); + break; + } + ib.flip(); + jb.flip(); + int cmp = Math.min(ib.remaining(), jb.remaining()); + for (int k = 0; k < cmp; ++k) { + assertEquals(ib.get(), jb.get()); + } + ib.compact(); + jb.compact(); + } + } + } + } + + @Test + public void testProvidedReplicaRead() throws IOException { + + File providedFile = new File(BASE_DIR, FILE_NAME); + for(int i=0; i < replicas.size(); i++) { + ProvidedReplica replica = replicas.get(i); + //block data should exist! + assertTrue(replica.blockDataExists()); + assertEquals(providedFile.toURI(), replica.getBlockURI()); + verifyReplicaContents(providedFile, replica.getDataInputStream(0), + BLK_LEN*i, replica.getBlockDataLength()); + } + LOG.info("All replica contents verified"); + + providedFile.delete(); + //the block data should no longer be found! + for(int i=0; i < replicas.size(); i++) { + ProvidedReplica replica = replicas.get(i); + assertTrue(!replica.blockDataExists()); + } + } + +}