HADOOP-14383. Implement FileSystem that reads from HTTP / HTTPS endpoints.
This commit is contained in:
parent
424887ecb7
commit
ff5ec3b841
@ -318,6 +318,11 @@
|
||||
<artifactId>aalto-xml</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>mockwebserver</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,153 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.http;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
|
||||
abstract class AbstractHttpFileSystem extends FileSystem {
|
||||
private static final long DEFAULT_BLOCK_SIZE = 4096;
|
||||
private static final Path WORKING_DIR = new Path("/");
|
||||
|
||||
private URI uri;
|
||||
|
||||
@Override
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
this.uri = name;
|
||||
}
|
||||
|
||||
public abstract String getScheme();
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
|
||||
URLConnection conn = path.toUri().toURL().openConnection();
|
||||
InputStream in = conn.getInputStream();
|
||||
return new FSDataInputStream(new HttpDataInputStream(in));
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path path, FsPermission fsPermission,
|
||||
boolean b, int i, short i1, long l,
|
||||
Progressable progressable)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path path, int i, Progressable progressable)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path path, Path path1) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path path, boolean b) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path path) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path path) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return WORKING_DIR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path path, FsPermission fsPermission)
|
||||
throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path path) throws IOException {
|
||||
return new FileStatus(-1, false, 1, DEFAULT_BLOCK_SIZE, 0, path);
|
||||
}
|
||||
|
||||
private static class HttpDataInputStream extends FilterInputStream
|
||||
implements Seekable, PositionedReadable {
|
||||
|
||||
HttpDataInputStream(InputStream in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(long position, byte[] buffer) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPos() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.http;
|
||||
|
||||
/**
|
||||
* A Filesystem that reads from HTTP endpoint.
|
||||
*/
|
||||
public class HttpFileSystem extends AbstractHttpFileSystem {
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "http";
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.http;
|
||||
|
||||
/**
|
||||
* A Filesystem that reads from HTTPS endpoint.
|
||||
*/
|
||||
public class HttpsFileSystem extends AbstractHttpFileSystem {
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "https";
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Filesystem implementations that allow Hadoop to read directly from
|
||||
* HTTP / HTTPS endpoints.
|
||||
*/
|
||||
package org.apache.hadoop.fs.http;
|
@ -17,3 +17,5 @@ org.apache.hadoop.fs.LocalFileSystem
|
||||
org.apache.hadoop.fs.viewfs.ViewFileSystem
|
||||
org.apache.hadoop.fs.ftp.FTPFileSystem
|
||||
org.apache.hadoop.fs.HarFileSystem
|
||||
org.apache.hadoop.fs.http.HttpFileSystem
|
||||
org.apache.hadoop.fs.http.HttpsFileSystem
|
||||
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.http;
|
||||
|
||||
import okhttp3.mockwebserver.MockResponse;
|
||||
import okhttp3.mockwebserver.MockWebServer;
|
||||
import okhttp3.mockwebserver.RecordedRequest;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Testing HttpFileSystem.
|
||||
*/
|
||||
public class TestHttpFileSystem {
|
||||
@Test
|
||||
public void testHttpFileSystem() throws IOException, URISyntaxException,
|
||||
InterruptedException {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.set("fs.http.impl", HttpFileSystem.class.getCanonicalName());
|
||||
final String data = "foo";
|
||||
|
||||
try (MockWebServer server = new MockWebServer()) {
|
||||
server.enqueue(new MockResponse().setBody(data));
|
||||
server.start();
|
||||
URI uri = URI.create(String.format("http://%s:%d", server.getHostName(),
|
||||
server.getPort()));
|
||||
FileSystem fs = FileSystem.get(uri, conf);
|
||||
try (InputStream is = fs.open(
|
||||
new Path(new URL(uri.toURL(), "/foo").toURI()),
|
||||
4096)) {
|
||||
byte[] buf = new byte[data.length()];
|
||||
IOUtils.readFully(is, buf, 0, buf.length);
|
||||
assertEquals(data, new String(buf, StandardCharsets.UTF_8));
|
||||
}
|
||||
RecordedRequest req = server.takeRequest();
|
||||
assertEquals("/foo", req.getPath());
|
||||
}
|
||||
}
|
||||
}
|
@ -145,6 +145,12 @@
|
||||
<artifactId>okhttp</artifactId>
|
||||
<version>2.4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>mockwebserver</artifactId>
|
||||
<version>3.7.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jdiff</groupId>
|
||||
<artifactId>jdiff</artifactId>
|
||||
|
Loading…
Reference in New Issue
Block a user