HADOOP-15320. Remove customized getFileBlockLocations for hadoop-azure and hadoop-azure-datalake. Contributed by Shanyu Zhao
This commit is contained in:
parent
0b1c2b5fe1
commit
081c350188
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.ContentSummary.Builder;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
@ -910,45 +909,6 @@ public long getBlockSize(Path f) throws IOException {
|
||||
return ADL_BLOCK_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
||||
final long offset, final long length) throws IOException {
|
||||
if (status == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((offset < 0) || (length < 0)) {
|
||||
throw new IllegalArgumentException("Invalid start or len parameter");
|
||||
}
|
||||
|
||||
if (status.getLen() < offset) {
|
||||
return new BlockLocation[0];
|
||||
}
|
||||
|
||||
final String[] name = {"localhost"};
|
||||
final String[] host = {"localhost"};
|
||||
long blockSize = ADL_BLOCK_SIZE;
|
||||
int numberOfLocations =
|
||||
(int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
|
||||
BlockLocation[] locations = new BlockLocation[numberOfLocations];
|
||||
for (int i = 0; i < locations.length; i++) {
|
||||
long currentOffset = offset + (i * blockSize);
|
||||
long currentLength = Math.min(blockSize, offset + length - currentOffset);
|
||||
locations[i] = new BlockLocation(name, host, currentOffset,
|
||||
currentLength);
|
||||
}
|
||||
|
||||
return locations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
|
||||
final long length) throws IOException {
|
||||
// read ops incremented in getFileStatus
|
||||
FileStatus fileStatus = getFileStatus(p);
|
||||
return getFileBlockLocations(fileStatus, offset, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get replication.
|
||||
*
|
||||
|
@ -52,7 +52,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@ -726,10 +725,6 @@ public String getScheme() {
|
||||
|
||||
static final String AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
|
||||
|
||||
static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
|
||||
"fs.azure.block.location.impersonatedhost";
|
||||
private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
|
||||
"localhost";
|
||||
static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
|
||||
"fs.azure.ring.buffer.capacity";
|
||||
static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
|
||||
@ -3469,47 +3464,6 @@ private void performStickyBitCheckForRenameOperation(Path srcPath,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array containing hostnames, offset and size of
|
||||
* portions of the given file. For WASB we'll just lie and give
|
||||
* fake hosts to make sure we get many splits in MR jobs.
|
||||
*/
|
||||
@Override
|
||||
public BlockLocation[] getFileBlockLocations(FileStatus file,
|
||||
long start, long len) throws IOException {
|
||||
if (file == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((start < 0) || (len < 0)) {
|
||||
throw new IllegalArgumentException("Invalid start or len parameter");
|
||||
}
|
||||
|
||||
if (file.getLen() < start) {
|
||||
return new BlockLocation[0];
|
||||
}
|
||||
final String blobLocationHost = getConf().get(
|
||||
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
|
||||
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
|
||||
final String[] name = { blobLocationHost };
|
||||
final String[] host = { blobLocationHost };
|
||||
long blockSize = file.getBlockSize();
|
||||
if (blockSize <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"The block size for the given file is not a positive number: "
|
||||
+ blockSize);
|
||||
}
|
||||
int numberOfLocations = (int) (len / blockSize)
|
||||
+ ((len % blockSize == 0) ? 0 : 1);
|
||||
BlockLocation[] locations = new BlockLocation[numberOfLocations];
|
||||
for (int i = 0; i < locations.length; i++) {
|
||||
long currentOffset = start + (i * blockSize);
|
||||
long currentLength = Math.min(blockSize, start + len - currentOffset);
|
||||
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
|
||||
}
|
||||
return locations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the working directory to the given directory.
|
||||
*/
|
||||
|
@ -1,141 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test block location logic.
|
||||
*/
|
||||
public class TestNativeAzureFileSystemBlockLocations
|
||||
extends AbstractWasbTestWithTimeout {
|
||||
@Test
|
||||
public void testNumberOfBlocks() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_SIZE_PROPERTY_NAME, "500");
|
||||
AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
|
||||
.createMock(conf);
|
||||
FileSystem fs = testAccount.getFileSystem();
|
||||
Path testFile = createTestFile(fs, 1200);
|
||||
FileStatus stat = fs.getFileStatus(testFile);
|
||||
assertEquals(500, stat.getBlockSize());
|
||||
testAccount.cleanup();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsTypical() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(210, 50, 0, 210);
|
||||
assertEquals(5, locations.length);
|
||||
assertEquals("localhost", locations[0].getHosts()[0]);
|
||||
assertEquals(50, locations[0].getLength());
|
||||
assertEquals(10, locations[4].getLength());
|
||||
assertEquals(100, locations[2].getOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsEmptyFile() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(0, 50, 0, 0);
|
||||
assertEquals(0, locations.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsSmallFile() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(1, 50, 0, 1);
|
||||
assertEquals(1, locations.length);
|
||||
assertEquals(1, locations[0].getLength());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsExactBlockSizeMultiple() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(200, 50, 0, 200);
|
||||
assertEquals(4, locations.length);
|
||||
assertEquals(150, locations[3].getOffset());
|
||||
assertEquals(50, locations[3].getLength());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsSubsetOfFile() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 15, 35);
|
||||
assertEquals(4, locations.length);
|
||||
assertEquals(10, locations[0].getLength());
|
||||
assertEquals(15, locations[0].getOffset());
|
||||
assertEquals(5, locations[3].getLength());
|
||||
assertEquals(45, locations[3].getOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsOutOfRangeSubsetOfFile() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 300, 10);
|
||||
assertEquals(0, locations.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsEmptySubsetOfFile() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 0, 0);
|
||||
assertEquals(0, locations.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockLocationsDifferentLocationHost() throws Exception {
|
||||
BlockLocation[] locations = getBlockLocationsOutput(100, 10, 0, 100,
|
||||
"myblobhost");
|
||||
assertEquals(10, locations.length);
|
||||
assertEquals("myblobhost", locations[0].getHosts()[0]);
|
||||
}
|
||||
|
||||
private static BlockLocation[] getBlockLocationsOutput(int fileSize,
|
||||
int blockSize, long start, long len) throws Exception {
|
||||
return getBlockLocationsOutput(fileSize, blockSize, start, len, null);
|
||||
}
|
||||
|
||||
private static BlockLocation[] getBlockLocationsOutput(int fileSize,
|
||||
int blockSize, long start, long len, String blockLocationHost)
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_SIZE_PROPERTY_NAME, ""
|
||||
+ blockSize);
|
||||
if (blockLocationHost != null) {
|
||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
|
||||
blockLocationHost);
|
||||
}
|
||||
AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
|
||||
.createMock(conf);
|
||||
FileSystem fs = testAccount.getFileSystem();
|
||||
Path testFile = createTestFile(fs, fileSize);
|
||||
FileStatus stat = fs.getFileStatus(testFile);
|
||||
BlockLocation[] locations = fs.getFileBlockLocations(stat, start, len);
|
||||
testAccount.cleanup();
|
||||
return locations;
|
||||
}
|
||||
|
||||
private static Path createTestFile(FileSystem fs, int size) throws Exception {
|
||||
Path testFile = new Path("/testFile");
|
||||
OutputStream outputStream = fs.create(testFile);
|
||||
outputStream.write(new byte[size]);
|
||||
outputStream.close();
|
||||
return testFile;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user