diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java new file mode 100644 index 0000000000..e1336b8085 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Byte array backed part handle. + */ +public final class BBPartHandle implements PartHandle { + + private static final long serialVersionUID = 0x23ce3eb1; + + private final byte[] bytes; + + private BBPartHandle(ByteBuffer byteBuffer){ + this.bytes = byteBuffer.array(); + } + + public static PartHandle from(ByteBuffer byteBuffer) { + return new BBPartHandle(byteBuffer); + } + + @Override + public ByteBuffer bytes() { + return ByteBuffer.wrap(bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PartHandle)) { + return false; + + } + PartHandle o = (PartHandle) other; + return bytes().equals(o.bytes()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java new file mode 100644 index 0000000000..6430c145e2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Byte array backed upload handle. + */ +public final class BBUploadHandle implements UploadHandle { + + private static final long serialVersionUID = 0x69d5509b; + + private final byte[] bytes; + + private BBUploadHandle(ByteBuffer byteBuffer){ + this.bytes = byteBuffer.array(); + } + + public static UploadHandle from(ByteBuffer byteBuffer) { + return new BBUploadHandle(byteBuffer); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public ByteBuffer bytes() { + return ByteBuffer.wrap(bytes); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof UploadHandle)) { + return false; + } + UploadHandle o = (UploadHandle) other; + return bytes().equals(o.bytes()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java new file mode 100644 index 0000000000..b57ff3dc3a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.google.common.base.Charsets; +import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A MultipartUploader that uses the basic FileSystem commands. + * This is done in three stages: + * Init - create a temp _multipart directory. + * PutPart - copying the individual parts of the file to the temp directory. + * Complete - use {@link FileSystem#concat} to merge the files; and then delete + * the temp directory. + */ +public class FileSystemMultipartUploader extends MultipartUploader { + + private final FileSystem fs; + + public FileSystemMultipartUploader(FileSystem fs) { + this.fs = fs; + } + + @Override + public UploadHandle initialize(Path filePath) throws IOException { + Path collectorPath = createCollectorPath(filePath); + fs.mkdirs(collectorPath, FsPermission.getDirDefault()); + + ByteBuffer byteBuffer = ByteBuffer.wrap( + collectorPath.toString().getBytes(Charsets.UTF_8)); + return BBUploadHandle.from(byteBuffer); + } + + @Override + public PartHandle putPart(Path filePath, InputStream inputStream, + int partNumber, UploadHandle uploadId, long lengthInBytes) + throws IOException { + + byte[] uploadIdByteArray = uploadId.toByteArray(); + Path collectorPath = new Path(new String(uploadIdByteArray, 0, + uploadIdByteArray.length, Charsets.UTF_8)); + Path partPath = + Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR), + new Path(Integer.toString(partNumber) + ".part"))); + FSDataOutputStreamBuilder outputStream = fs.createFile(partPath); + FSDataOutputStream fsDataOutputStream = outputStream.build(); + IOUtils.copy(inputStream, fsDataOutputStream, 4096); + fsDataOutputStream.close(); + return BBPartHandle.from(ByteBuffer.wrap( + partPath.toString().getBytes(Charsets.UTF_8))); + } + + private Path createCollectorPath(Path filePath) { + return Path.mergePaths(filePath.getParent(), + Path.mergePaths(new Path(filePath.getName().split("\\.")[0]), + Path.mergePaths(new Path("_multipart"), + new Path(Path.SEPARATOR)))); + } + + @Override + @SuppressWarnings("deprecation") // rename w/ OVERWRITE + public PathHandle complete(Path filePath, + List> handles, UploadHandle multipartUploadId) + throws IOException { + handles.sort(Comparator.comparing(Pair::getKey)); + List partHandles = handles + .stream() + .map(pair -> { + byte[] byteArray = pair.getValue().toByteArray(); + return new Path(new String(byteArray, 0, byteArray.length, + Charsets.UTF_8)); + }) + .collect(Collectors.toList()); + + Path collectorPath = createCollectorPath(filePath); + Path filePathInsideCollector = Path.mergePaths(collectorPath, + new Path(Path.SEPARATOR + filePath.getName())); + fs.create(filePathInsideCollector).close(); + fs.concat(filePathInsideCollector, + partHandles.toArray(new Path[handles.size()])); + fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE); + fs.delete(collectorPath, true); + FileStatus status = fs.getFileStatus(filePath); + return fs.getPathHandle(status); + } + + @Override + public void abort(Path filePath, UploadHandle uploadId) throws IOException { + byte[] uploadIdByteArray = uploadId.toByteArray(); + Path collectorPath = new Path(new String(uploadIdByteArray, 0, + uploadIdByteArray.length, Charsets.UTF_8)); + fs.delete(collectorPath, true); + } + + /** + * Factory for creating MultipartUploaderFactory objects for file:// + * filesystems. + */ + public static class Factory extends MultipartUploaderFactory { + protected MultipartUploader createMultipartUploader(FileSystem fs, + Configuration conf) { + if (fs.getScheme().equals("file")) { + return new FileSystemMultipartUploader(fs); + } + return null; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java new file mode 100644 index 0000000000..a6b37b32bb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.FSProtos.LocalFileSystemPathHandleProto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Optional; + +/** + * Opaque handle to an entity in a FileSystem. + */ +public class LocalFileSystemPathHandle implements PathHandle { + + private final String path; + private final Long mtime; + + public LocalFileSystemPathHandle(String path, Optional mtime) { + this.path = path; + this.mtime = mtime.orElse(null); + } + + public LocalFileSystemPathHandle(ByteBuffer bytes) throws IOException { + if (null == bytes) { + throw new IOException("Missing PathHandle"); + } + LocalFileSystemPathHandleProto p = + LocalFileSystemPathHandleProto.parseFrom(ByteString.copyFrom(bytes)); + path = p.hasPath() ? p.getPath() : null; + mtime = p.hasMtime() ? p.getMtime() : null; + } + + public String getPath() { + return path; + } + + public void verify(FileStatus stat) throws InvalidPathHandleException { + if (null == stat) { + throw new InvalidPathHandleException("Could not resolve handle"); + } + if (mtime != null && mtime != stat.getModificationTime()) { + throw new InvalidPathHandleException("Content changed"); + } + } + + @Override + public ByteBuffer bytes() { + LocalFileSystemPathHandleProto.Builder b = + LocalFileSystemPathHandleProto.newBuilder(); + b.setPath(path); + if (mtime != null) { + b.setMtime(mtime); + } + return b.build().toByteString().asReadOnlyByteBuffer(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LocalFileSystemPathHandle that = (LocalFileSystemPathHandle) o; + return Objects.equals(path, that.path) && + Objects.equals(mtime, that.mtime); + } + + @Override + public int hashCode() { + return Objects.hash(path, mtime); + } + + @Override + public String toString() { + return "LocalFileSystemPathHandle{" + + "path='" + path + '\'' + + ", mtime=" + mtime + + '}'; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java new file mode 100644 index 0000000000..24a92169a2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MultipartUploader is an interface for copying files multipart and across + * multiple nodes. Users should: + * 1. Initialize an upload + * 2. Upload parts in any order + * 3. Complete the upload in order to have it materialize in the destination FS. + * + * Implementers should make sure that the complete function should make sure + * that 'complete' will reorder parts if the destination FS doesn't already + * do it for them. + */ +public abstract class MultipartUploader { + public static final Logger LOG = + LoggerFactory.getLogger(MultipartUploader.class); + + /** + * Initialize a multipart upload. + * @param filePath Target path for upload. + * @return unique identifier associating part uploads. + * @throws IOException + */ + public abstract UploadHandle initialize(Path filePath) throws IOException; + + /** + * Put part as part of a multipart upload. It should be possible to have + * parts uploaded in any order (or in parallel). + * @param filePath Target path for upload (same as {@link #initialize(Path)}). + * @param inputStream Data for this part. + * @param partNumber Index of the part relative to others. + * @param uploadId Identifier from {@link #initialize(Path)}. + * @param lengthInBytes Target length to read from the stream. + * @return unique PartHandle identifier for the uploaded part. + * @throws IOException + */ + public abstract PartHandle putPart(Path filePath, InputStream inputStream, + int partNumber, UploadHandle uploadId, long lengthInBytes) + throws IOException; + + /** + * Complete a multipart upload. + * @param filePath Target path for upload (same as {@link #initialize(Path)}. + * @param handles Identifiers with associated part numbers from + * {@link #putPart(Path, InputStream, int, UploadHandle, long)}. + * Depending on the backend, the list order may be significant. + * @param multipartUploadId Identifier from {@link #initialize(Path)}. + * @return unique PathHandle identifier for the uploaded file. + * @throws IOException + */ + public abstract PathHandle complete(Path filePath, + List> handles, UploadHandle multipartUploadId) + throws IOException; + + /** + * Aborts a multipart upload. + * @param filePath Target path for upload (same as {@link #initialize(Path)}. + * @param multipartuploadId Identifier from {@link #initialize(Path)}. + * @throws IOException + */ + public abstract void abort(Path filePath, UploadHandle multipartuploadId) + throws IOException; + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java new file mode 100644 index 0000000000..b0fa798ee2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.ServiceLoader; + +/** + * {@link ServiceLoader}-driven uploader API for storage services supporting + * multipart uploads. + */ +public abstract class MultipartUploaderFactory { + public static final Logger LOG = + LoggerFactory.getLogger(MultipartUploaderFactory.class); + + /** + * Multipart Uploaders listed as services. + */ + private static ServiceLoader serviceLoader = + ServiceLoader.load(MultipartUploaderFactory.class, + MultipartUploaderFactory.class.getClassLoader()); + + // Iterate through the serviceLoader to avoid lazy loading. + // Lazy loading would require synchronization in concurrent use cases. + static { + Iterator iterServices = serviceLoader.iterator(); + while (iterServices.hasNext()) { + iterServices.next(); + } + } + + public static MultipartUploader get(FileSystem fs, Configuration conf) + throws IOException { + MultipartUploader mpu = null; + for (MultipartUploaderFactory factory : serviceLoader) { + mpu = factory.createMultipartUploader(fs, conf); + if (mpu != null) { + break; + } + } + return mpu; + } + + protected abstract MultipartUploader createMultipartUploader(FileSystem fs, + Configuration conf) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java new file mode 100644 index 0000000000..df70b746cc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Opaque, serializable reference to an part id for multipart uploads. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface PartHandle extends Serializable { + /** + * @return Serialized from in bytes. + */ + default byte[] toByteArray() { + ByteBuffer bb = bytes(); + byte[] ret = new byte[bb.remaining()]; + bb.get(ret); + return ret; + } + + ByteBuffer bytes(); + + @Override + boolean equals(Object other); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index c0f81997b8..bd003ae90a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -40,6 +40,7 @@ import java.nio.file.attribute.FileTime; import java.util.Arrays; import java.util.EnumSet; +import java.util.Optional; import java.util.StringTokenizer; import org.apache.hadoop.classification.InterfaceAudience; @@ -212,7 +213,19 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { return new FSDataInputStream(new BufferedFSInputStream( new LocalFSFileInputStream(f), bufferSize)); } - + + @Override + public FSDataInputStream open(PathHandle fd, int bufferSize) + throws IOException { + if (!(fd instanceof LocalFileSystemPathHandle)) { + fd = new LocalFileSystemPathHandle(fd.bytes()); + } + LocalFileSystemPathHandle id = (LocalFileSystemPathHandle) fd; + id.verify(getFileStatus(new Path(id.getPath()))); + return new FSDataInputStream(new BufferedFSInputStream( + new LocalFSFileInputStream(new Path(id.getPath())), bufferSize)); + } + /********************************************************* * For create()'s FSOutputStream. *********************************************************/ @@ -246,7 +259,7 @@ private LocalFSFileOutputStream(Path f, boolean append, } } } - + /* * Just forward to the fos */ @@ -350,6 +363,18 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, return out; } + @Override + public void concat(final Path trg, final Path [] psrcs) throws IOException { + final int bufferSize = 4096; + try(FSDataOutputStream out = create(trg)) { + for (Path src : psrcs) { + try(FSDataInputStream in = open(src)) { + IOUtils.copyBytes(in, out, bufferSize, false); + } + } + } + } + @Override public boolean rename(Path src, Path dst) throws IOException { // Attempt rename using Java API. @@ -863,6 +888,38 @@ public void setTimes(Path p, long mtime, long atime) throws IOException { } } + /** + * Hook to implement support for {@link PathHandle} operations. + * @param stat Referent in the target FileSystem + * @param opts Constraints that determine the validity of the + * {@link PathHandle} reference. + */ + protected PathHandle createPathHandle(FileStatus stat, + Options.HandleOpt... opts) { + if (stat.isDirectory() || stat.isSymlink()) { + throw new IllegalArgumentException("PathHandle only available for files"); + } + String authority = stat.getPath().toUri().getAuthority(); + if (authority != null && !authority.equals("file://")) { + throw new IllegalArgumentException("Wrong FileSystem: " + stat.getPath()); + } + Options.HandleOpt.Data data = + Options.HandleOpt.getOpt(Options.HandleOpt.Data.class, opts) + .orElse(Options.HandleOpt.changed(false)); + Options.HandleOpt.Location loc = + Options.HandleOpt.getOpt(Options.HandleOpt.Location.class, opts) + .orElse(Options.HandleOpt.moved(false)); + if (loc.allowChange()) { + throw new UnsupportedOperationException("Tracking file movement in " + + "basic FileSystem is not supported"); + } + final Path p = stat.getPath(); + final Optional mtime = !data.allowChange() + ? Optional.of(stat.getModificationTime()) + : Optional.empty(); + return new LocalFileSystemPathHandle(p.toString(), mtime); + } + @Override public boolean supportsSymlinks() { return true; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java new file mode 100644 index 0000000000..5606a80dec --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * MultipartUploader for a given file system name/scheme is not supported. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class UnsupportedMultipartUploaderException extends IOException { + private static final long serialVersionUID = 1L; + + /** + * Constructs exception with the specified detail message. + * + * @param message exception message. + */ + public UnsupportedMultipartUploaderException(final String message) { + super(message); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java new file mode 100644 index 0000000000..143b4d1584 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Opaque, serializable reference to an uploadId for multipart uploads. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface UploadHandle extends Serializable { + + /** + * @return Serialized from in bytes. + */ + default byte[] toByteArray() { + ByteBuffer bb = bytes(); + byte[] ret = new byte[bb.remaining()]; + bb.get(ret); + return ret; + } + + ByteBuffer bytes(); + + @Override + boolean equals(Object other); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto b/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto index 5b8c45d0ad..c3b768ab67 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto @@ -68,3 +68,11 @@ message FileStatusProto { optional bytes ec_data = 17; optional uint32 flags = 18 [default = 0]; } + +/** + * Placeholder type for consistent basic FileSystem operations. + */ +message LocalFileSystemPathHandleProto { + optional uint64 mtime = 1; + optional string path = 2; +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory new file mode 100644 index 0000000000..f0054fedb8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.FileSystemMultipartUploader$Factory diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java new file mode 100644 index 0000000000..f132089a9e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public abstract class AbstractSystemMultipartUploaderTest { + + abstract FileSystem getFS() throws IOException; + + abstract Path getBaseTestPath(); + + @Test + public void testMultipartUpload() throws Exception { + FileSystem fs = getFS(); + Path file = new Path(getBaseTestPath(), "some-file"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + for (int i = 1; i <= 100; ++i) { + String contents = "ThisIsPart" + i + "\n"; + sb.append(contents); + int len = contents.getBytes().length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); + partHandles.add(Pair.of(i, partHandle)); + } + PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + byte[] fdData = IOUtils.toByteArray(fs.open(fd)); + byte[] fileData = IOUtils.toByteArray(fs.open(file)); + String readString = new String(fdData); + assertEquals(sb.toString(), readString); + assertArrayEquals(fdData, fileData); + } + + @Test + public void testMultipartUploadReverseOrder() throws Exception { + FileSystem fs = getFS(); + Path file = new Path(getBaseTestPath(), "some-file"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + for (int i = 1; i <= 100; ++i) { + String contents = "ThisIsPart" + i + "\n"; + sb.append(contents); + } + for (int i = 100; i > 0; --i) { + String contents = "ThisIsPart" + i + "\n"; + int len = contents.getBytes().length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); + partHandles.add(Pair.of(i, partHandle)); + } + PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + byte[] fdData = IOUtils.toByteArray(fs.open(fd)); + byte[] fileData = IOUtils.toByteArray(fs.open(file)); + String readString = new String(fdData); + assertEquals(sb.toString(), readString); + assertArrayEquals(fdData, fileData); + } + + @Test + public void testMultipartUploadReverseOrderNoNContiguousPartNumbers() + throws Exception { + FileSystem fs = getFS(); + Path file = new Path(getBaseTestPath(), "some-file"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + List> partHandles = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + for (int i = 2; i <= 200; i += 2) { + String contents = "ThisIsPart" + i + "\n"; + sb.append(contents); + } + for (int i = 200; i > 0; i -= 2) { + String contents = "ThisIsPart" + i + "\n"; + int len = contents.getBytes().length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); + partHandles.add(Pair.of(i, partHandle)); + } + PathHandle fd = mpu.complete(file, partHandles, uploadHandle); + byte[] fdData = IOUtils.toByteArray(fs.open(fd)); + byte[] fileData = IOUtils.toByteArray(fs.open(file)); + String readString = new String(fdData); + assertEquals(sb.toString(), readString); + assertArrayEquals(fdData, fileData); + } + + @Test + public void testMultipartUploadAbort() throws Exception { + FileSystem fs = getFS(); + Path file = new Path(getBaseTestPath(), "some-file"); + MultipartUploader mpu = MultipartUploaderFactory.get(fs, null); + UploadHandle uploadHandle = mpu.initialize(file); + for (int i = 100; i >= 50; --i) { + String contents = "ThisIsPart" + i + "\n"; + int len = contents.getBytes().length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len); + } + mpu.abort(file, uploadHandle); + + String contents = "ThisIsPart49\n"; + int len = contents.getBytes().length; + InputStream is = IOUtils.toInputStream(contents, "UTF-8"); + + try { + mpu.putPart(file, is, 49, uploadHandle, len); + fail("putPart should have thrown an exception"); + } catch (IOException ok) { + // ignore + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java new file mode 100644 index 0000000000..21d01b6cdb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.IOException; + +/** + * Test the FileSystemMultipartUploader on local file system. + */ +public class TestLocalFileSystemMultipartUploader + extends AbstractSystemMultipartUploaderTest { + + private static FileSystem fs; + private File tmp; + + @BeforeClass + public static void init() throws IOException { + fs = LocalFileSystem.getLocal(new Configuration()); + } + + @Before + public void setup() throws IOException { + tmp = getRandomizedTestDir(); + tmp.mkdirs(); + } + + @After + public void tearDown() throws IOException { + tmp.delete(); + } + + @Override + public FileSystem getFS() { + return fs; + } + + @Override + public Path getBaseTestPath() { + return new Path(tmp.getAbsolutePath()); + } + +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java index fbe28c3c24..36cfa6ccda 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java @@ -123,6 +123,12 @@ public void testChanged() throws IOException { HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts) .orElseThrow(IllegalArgumentException::new); FileStatus stat = testFile(B1); + try { + // Temporary workaround while RawLocalFS supports only second precision + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new IOException(e); + } // modify the file by appending data appendFile(getFileSystem(), stat.getPath(), B2); byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java new file mode 100644 index 0000000000..3c088d278e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.contract.rawlocal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.contract.AbstractContractPathHandleTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.localfs.LocalFSContract; +import org.apache.hadoop.fs.contract.rawlocal.RawlocalFSContract; + +public class TestRawlocalContractPathHandle + extends AbstractContractPathHandleTest { + + public TestRawlocalContractPathHandle(String testname, + Options.HandleOpt[] opts, boolean serialized) { + super(testname, opts, serialized); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RawlocalFSContract(conf); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml index a0d1d21a94..8cbd4a0abc 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml @@ -122,4 +122,9 @@ true + + fs.contract.supports-content-check + true + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java new file mode 100644 index 0000000000..e9959c192d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemMultipartUploader; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.MultipartUploaderFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +/** + * Support for HDFS multipart uploads, built on + * {@link FileSystem#concat(Path, Path[])}. + */ +public class DFSMultipartUploaderFactory extends MultipartUploaderFactory { + protected MultipartUploader createMultipartUploader(FileSystem fs, + Configuration conf) { + if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) { + return new FileSystemMultipartUploader(fs); + } + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory new file mode 100644 index 0000000000..b153fd9924 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.hdfs.DFSMultipartUploaderFactory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java new file mode 100644 index 0000000000..96c50938b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +import java.io.IOException; + +public class TestHDFSMultipartUploader + extends AbstractSystemMultipartUploaderTest { + + private static MiniDFSCluster cluster; + private Path tmp; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void init() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf, + GenericTestUtils.getRandomizedTestDir()) + .numDataNodes(1) + .build(); + cluster.waitClusterUp(); + } + + @AfterClass + public static void cleanup() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Before + public void setup() throws IOException { + tmp = new Path(cluster.getFileSystem().getWorkingDirectory(), + name.getMethodName()); + cluster.getFileSystem().mkdirs(tmp); + } + + @Override + public FileSystem getFS() throws IOException { + return cluster.getFileSystem(); + } + + @Override + public Path getBaseTestPath() { + return tmp; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java new file mode 100644 index 0000000000..34c88d43f6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.base.Charsets; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BBPartHandle; +import org.apache.hadoop.fs.BBUploadHandle; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.MultipartUploaderFactory; +import org.apache.hadoop.fs.PartHandle; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.UploadHandle; +import org.apache.hadoop.hdfs.DFSUtilClient; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +/** + * MultipartUploader for S3AFileSystem. This uses the S3 multipart + * upload mechanism. + */ +public class S3AMultipartUploader extends MultipartUploader { + + private final S3AFileSystem s3a; + + public S3AMultipartUploader(FileSystem fs, Configuration conf) { + if (!(fs instanceof S3AFileSystem)) { + throw new IllegalArgumentException( + "S3A MultipartUploads must use S3AFileSystem"); + } + s3a = (S3AFileSystem) fs; + } + + @Override + public UploadHandle initialize(Path filePath) throws IOException { + String key = s3a.pathToKey(filePath); + InitiateMultipartUploadRequest request = + new InitiateMultipartUploadRequest(s3a.getBucket(), key); + LOG.debug("initialize request: {}", request); + InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request); + String uploadId = result.getUploadId(); + return BBUploadHandle.from(ByteBuffer.wrap( + uploadId.getBytes(Charsets.UTF_8))); + } + + @Override + public PartHandle putPart(Path filePath, InputStream inputStream, + int partNumber, UploadHandle uploadId, long lengthInBytes) { + String key = s3a.pathToKey(filePath); + UploadPartRequest request = new UploadPartRequest(); + byte[] uploadIdBytes = uploadId.toByteArray(); + request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8)); + request.setInputStream(inputStream); + request.setPartSize(lengthInBytes); + request.setPartNumber(partNumber); + request.setBucketName(s3a.getBucket()); + request.setKey(key); + LOG.debug("putPart request: {}", request); + UploadPartResult result = s3a.uploadPart(request); + String eTag = result.getETag(); + return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8))); + } + + @Override + public PathHandle complete(Path filePath, + List> handles, UploadHandle uploadId) { + String key = s3a.pathToKey(filePath); + CompleteMultipartUploadRequest request = + new CompleteMultipartUploadRequest(); + request.setBucketName(s3a.getBucket()); + request.setKey(key); + byte[] uploadIdBytes = uploadId.toByteArray(); + request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8)); + List eTags = handles + .stream() + .map(handle -> { + byte[] partEtagBytes = handle.getRight().toByteArray(); + return new PartETag(handle.getLeft(), + new String(partEtagBytes, 0, partEtagBytes.length, + Charsets.UTF_8)); + }) + .collect(Collectors.toList()); + request.setPartETags(eTags); + LOG.debug("Complete request: {}", request); + CompleteMultipartUploadResult completeMultipartUploadResult = + s3a.getAmazonS3Client().completeMultipartUpload(request); + + byte[] eTag = DFSUtilClient.string2Bytes( + completeMultipartUploadResult.getETag()); + return (PathHandle) () -> ByteBuffer.wrap(eTag); + } + + @Override + public void abort(Path filePath, UploadHandle uploadId) { + String key = s3a.pathToKey(filePath); + byte[] uploadIdBytes = uploadId.toByteArray(); + String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length, + Charsets.UTF_8); + AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a + .getBucket(), key, uploadIdString); + LOG.debug("Abort request: {}", request); + s3a.getAmazonS3Client().abortMultipartUpload(request); + } + + /** + * Factory for creating MultipartUploader objects for s3a:// FileSystems. + */ + public static class Factory extends MultipartUploaderFactory { + @Override + protected MultipartUploader createMultipartUploader(FileSystem fs, + Configuration conf) { + if (fs.getScheme().equals("s3a")) { + return new S3AMultipartUploader(fs, conf); + } + return null; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory new file mode 100644 index 0000000000..2e4bc241d0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader new file mode 100644 index 0000000000..d16846b25b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.s3a.S3AMultipartUploader