+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * BufferPool class is used to manage the buffers during program execution.
+ * It is provided in a thread-safe singleton mode,and
+ * keeps the program's memory and disk consumption at a stable value.
+ */
+public final class BufferPool {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BufferPool.class);
+
+ private static BufferPool ourInstance = new BufferPool();
+
+ /**
+ * Use this method to get the instance of BufferPool.
+ *
+ * @return the instance of BufferPool
+ */
+ public static BufferPool getInstance() {
+ return ourInstance;
+ }
+
+ private BlockingQueue
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+/**
+ * The input stream class is used for buffered files.
+ * The purpose of providing this class is to optimize buffer read performance.
+ */
+public class ByteBufferInputStream extends InputStream {
+ private ByteBuffer byteBuffer;
+ private boolean isClosed;
+
+ public ByteBufferInputStream(ByteBuffer byteBuffer) throws IOException {
+ if (null == byteBuffer) {
+ throw new IOException("byte buffer is null");
+ }
+ this.byteBuffer = byteBuffer;
+ this.isClosed = false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (null == this.byteBuffer) {
+ throw new IOException("this byte buffer for InputStream is null");
+ }
+ if (!this.byteBuffer.hasRemaining()) {
+ return -1;
+ }
+ return this.byteBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public synchronized void mark(int readLimit) {
+ if (!this.markSupported()) {
+ return;
+ }
+ this.byteBuffer.mark();
+ // Parameter readLimit is ignored
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (this.isClosed) {
+ throw new IOException("Closed in InputStream");
+ }
+ try {
+ this.byteBuffer.reset();
+ } catch (InvalidMarkException e) {
+ throw new IOException("Invalid mark");
+ }
+ }
+
+ @Override
+ public int available() {
+ return this.byteBuffer.remaining();
+ }
+
+ @Override
+ public void close() {
+ this.byteBuffer.rewind();
+ this.byteBuffer = null;
+ this.isClosed = true;
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferOutputStream.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferOutputStream.java
new file mode 100644
index 0000000000..9e6a6fc5f3
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferOutputStream.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * The input stream class is used for buffered files.
+ * The purpose of providing this class is to optimize buffer write performance.
+ */
+public class ByteBufferOutputStream extends OutputStream {
+ private ByteBuffer byteBuffer;
+ private boolean isFlush;
+ private boolean isClosed;
+
+ public ByteBufferOutputStream(ByteBuffer byteBuffer) throws IOException {
+ if (null == byteBuffer) {
+ throw new IOException("byte buffer is null");
+ }
+ this.byteBuffer = byteBuffer;
+ this.byteBuffer.clear();
+ this.isFlush = false;
+ this.isClosed = false;
+ }
+
+ @Override
+ public void write(int b) {
+ byte[] singleBytes = new byte[1];
+ singleBytes[0] = (byte) b;
+ this.byteBuffer.put(singleBytes, 0, 1);
+ this.isFlush = false;
+ }
+
+ @Override
+ public void flush() {
+ if (this.isFlush) {
+ return;
+ }
+ this.isFlush = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.isClosed) {
+ return;
+ }
+ if (null == this.byteBuffer) {
+ throw new IOException("Can not close a null object");
+ }
+
+ this.flush();
+ this.byteBuffer.flip();
+ this.byteBuffer = null;
+ this.isFlush = false;
+ this.isClosed = true;
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferWrapper.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferWrapper.java
new file mode 100644
index 0000000000..a7d1c5fffc
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/ByteBufferWrapper.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.util.CleanerUtil;
+
+/**
+ * The wrapper for memory buffers and disk buffers.
+ */
+public class ByteBufferWrapper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ByteBufferWrapper.class);
+ private ByteBuffer byteBuffer;
+ private File file;
+ private RandomAccessFile randomAccessFile;
+
+ ByteBufferWrapper(ByteBuffer byteBuffer) {
+ this(byteBuffer, null, null);
+ }
+
+ ByteBufferWrapper(ByteBuffer byteBuffer, RandomAccessFile randomAccessFile,
+ File file) {
+ this.byteBuffer = byteBuffer;
+ this.file = file;
+ this.randomAccessFile = randomAccessFile;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return this.byteBuffer;
+ }
+
+ boolean isDiskBuffer() {
+ return this.file != null && this.randomAccessFile != null;
+ }
+
+ private void munmap(MappedByteBuffer buffer) {
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ try {
+ CleanerUtil.getCleaner().freeBuffer(buffer);
+ } catch (IOException e) {
+ LOG.warn("Failed to unmap the buffer", e);
+ }
+ } else {
+ LOG.trace(CleanerUtil.UNMAP_NOT_SUPPORTED_REASON);
+ }
+ }
+
+ void close() throws IOException {
+ if (null != this.byteBuffer) {
+ this.byteBuffer.clear();
+ }
+
+ IOException exception = null;
+ // catch all exceptions, and try to free up resources that can be freed.
+ try {
+ if (null != randomAccessFile) {
+ this.randomAccessFile.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Close the random access file occurs an exception.", e);
+ exception = e;
+ }
+
+ if (this.byteBuffer instanceof MappedByteBuffer) {
+ munmap((MappedByteBuffer) this.byteBuffer);
+ }
+
+ if (null != this.file && this.file.exists()) {
+ if (!this.file.delete()) {
+ LOG.warn("Delete the tmp file: [{}] failed.",
+ this.file.getAbsolutePath());
+ }
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/Constants.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/Constants.java
new file mode 100644
index 0000000000..f67e07ecff
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/Constants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+/**
+ * constant definition.
+ */
+public final class Constants {
+ private Constants() {
+ }
+
+ public static final String BLOCK_TMP_FILE_PREFIX = "cos_";
+ public static final String BLOCK_TMP_FILE_SUFFIX = "_local_block";
+
+ // The maximum number of files listed in a single COS list request.
+ public static final int COS_MAX_LISTING_LENGTH = 999;
+
+ // The maximum number of parts supported by a multipart uploading.
+ public static final int MAX_PART_NUM = 10000;
+
+ // The maximum size of a part
+ public static final long MAX_PART_SIZE = (long) 2 * Unit.GB;
+ // The minimum size of a part
+ public static final long MIN_PART_SIZE = (long) Unit.MB;
+
+ public static final String COSN_SECRET_ID_ENV = "COSN_SECRET_ID";
+ public static final String COSN_SECRET_KEY_ENV = "COSN_SECRET_KEY";
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosN.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosN.java
new file mode 100644
index 0000000000..990fcbd56b
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosN.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+/**
+ * CosN implementation for the Hadoop's AbstractFileSystem.
+ * This implementation delegates to the CosNFileSystem {@link CosNFileSystem}.
+ */
+public class CosN extends DelegateToFileSystem {
+ public CosN(URI theUri, Configuration conf)
+ throws IOException, URISyntaxException {
+ super(theUri, new CosNFileSystem(), conf, CosNFileSystem.SCHEME, false);
+ }
+
+ @Override
+ public int getUriDefaultPort() {
+ return -1;
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNConfigKeys.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNConfigKeys.java
new file mode 100644
index 0000000000..4d98d5f7e2
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNConfigKeys.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+
+/**
+ * This class contains constants for configuration keys used in COS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CosNConfigKeys extends CommonConfigurationKeys {
+ public static final String USER_AGENT = "fs.cosn.user.agent";
+ public static final String DEFAULT_USER_AGENT = "cos-hadoop-plugin-v5.3";
+
+ public static final String COSN_CREDENTIALS_PROVIDER =
+ "fs.cosn.credentials.provider";
+ public static final String COSN_SECRET_ID_KEY = "fs.cosn.userinfo.secretId";
+ public static final String COSN_SECRET_KEY_KEY = "fs.cosn.userinfo.secretKey";
+ public static final String COSN_REGION_KEY = "fs.cosn.bucket.region";
+ public static final String COSN_ENDPOINT_SUFFIX_KEY =
+ "fs.cosn.bucket.endpoint_suffix";
+
+ public static final String COSN_USE_HTTPS_KEY = "fs.cosn.useHttps";
+ public static final boolean DEFAULT_USE_HTTPS = false;
+
+ public static final String COSN_BUFFER_DIR_KEY = "fs.cosn.tmp.dir";
+ public static final String DEFAULT_BUFFER_DIR = "/tmp/hadoop_cos";
+
+ public static final String COSN_UPLOAD_BUFFER_SIZE_KEY =
+ "fs.cosn.buffer.size";
+ public static final long DEFAULT_UPLOAD_BUFFER_SIZE = 32 * Unit.MB;
+
+ public static final String COSN_BLOCK_SIZE_KEY = "fs.cosn.block.size";
+ public static final long DEFAULT_BLOCK_SIZE = 8 * Unit.MB;
+
+ public static final String COSN_MAX_RETRIES_KEY = "fs.cosn.maxRetries";
+ public static final int DEFAULT_MAX_RETRIES = 3;
+ public static final String COSN_RETRY_INTERVAL_KEY =
+ "fs.cosn.retry.interval.seconds";
+ public static final long DEFAULT_RETRY_INTERVAL = 3;
+
+ public static final String UPLOAD_THREAD_POOL_SIZE_KEY =
+ "fs.cosn.upload_thread_pool";
+ public static final int DEFAULT_UPLOAD_THREAD_POOL_SIZE = 1;
+
+ public static final String COPY_THREAD_POOL_SIZE_KEY =
+ "fs.cosn.copy_thread_pool";
+ public static final int DEFAULT_COPY_THREAD_POOL_SIZE = 1;
+
+ /**
+ * This is the maximum time that excess idle threads will wait for new tasks
+ * before terminating. The time unit for it is second.
+ */
+ public static final String THREAD_KEEP_ALIVE_TIME_KEY =
+ "fs.cosn.threads.keep_alive_time";
+ // The default keep_alive_time is 60 seconds.
+ public static final long DEFAULT_THREAD_KEEP_ALIVE_TIME = 60L;
+
+ public static final String READ_AHEAD_BLOCK_SIZE_KEY =
+ "fs.cosn.read.ahead.block.size";
+ public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 512 * Unit.KB;
+ public static final String READ_AHEAD_QUEUE_SIZE =
+ "fs.cosn.read.ahead.queue.size";
+ public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 5;
+
+ public static final String MAX_CONNECTION_NUM = "fs.cosn.max.connection.num";
+ public static final int DEFAULT_MAX_CONNECTION_NUM = 2048;
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileContext.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileContext.java
new file mode 100644
index 0000000000..39a2e91351
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileContext.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The context of the copy task, including concurrency control,
+ * asynchronous acquisition of copy results and etc.
+ */
+public class CosNCopyFileContext {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private Condition readyCondition = lock.newCondition();
+
+ private AtomicBoolean copySuccess = new AtomicBoolean(true);
+ private AtomicInteger copiesFinish = new AtomicInteger(0);
+
+ public void lock() {
+ this.lock.lock();
+ }
+
+ public void unlock() {
+ this.lock.unlock();
+ }
+
+ public void awaitAllFinish(int waitCopiesFinish) throws InterruptedException {
+ while (this.copiesFinish.get() != waitCopiesFinish) {
+ this.readyCondition.await();
+ }
+ }
+
+ public void signalAll() {
+ this.readyCondition.signalAll();
+ }
+
+ public boolean isCopySuccess() {
+ return this.copySuccess.get();
+ }
+
+ public void setCopySuccess(boolean copySuccess) {
+ this.copySuccess.set(copySuccess);
+ }
+
+ public void incCopiesFinish() {
+ this.copiesFinish.addAndGet(1);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileTask.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileTask.java
new file mode 100644
index 0000000000..33d38b80e2
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNCopyFileTask.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link CosNFileSystem} as an task that submitted
+ * to the thread pool to accelerate the copy progress.
+ * Each task is responsible for copying the source key to the destination.
+ */
+public class CosNCopyFileTask implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CosNCopyFileTask.class);
+
+ private NativeFileSystemStore store;
+ private String srcKey;
+ private String dstKey;
+ private CosNCopyFileContext cosCopyFileContext;
+
+ public CosNCopyFileTask(NativeFileSystemStore store, String srcKey,
+ String dstKey, CosNCopyFileContext cosCopyFileContext) {
+ this.store = store;
+ this.srcKey = srcKey;
+ this.dstKey = dstKey;
+ this.cosCopyFileContext = cosCopyFileContext;
+ }
+
+ @Override
+ public void run() {
+ boolean fail = false;
+ LOG.info(Thread.currentThread().getName() + "copying...");
+ try {
+ this.store.copy(srcKey, dstKey);
+ } catch (IOException e) {
+ LOG.warn("Exception thrown when copy from {} to {}, exception:{}",
+ this.srcKey, this.dstKey, e);
+ fail = true;
+ } finally {
+ this.cosCopyFileContext.lock();
+ if (fail) {
+ cosCopyFileContext.setCopySuccess(false);
+ }
+ cosCopyFileContext.incCopiesFinish();
+ cosCopyFileContext.signalAll();
+ this.cosCopyFileContext.unlock();
+ }
+ }
+
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileReadTask.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileReadTask.java
new file mode 100644
index 0000000000..a5dcdda071
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileReadTask.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+/**
+ * Used by {@link CosNInputStream} as an asynchronous task
+ * submitted to the thread pool.
+ * Each task is responsible for reading a part of a large file.
+ * It is used to pre-read the data from COS to accelerate file reading process.
+ */
+public class CosNFileReadTask implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CosNFileReadTask.class);
+
+ private final String key;
+ private final NativeFileSystemStore store;
+ private final CosNInputStream.ReadBuffer readBuffer;
+
+ private RetryPolicy retryPolicy;
+
+ public CosNFileReadTask(
+ Configuration conf,
+ String key, NativeFileSystemStore store,
+ CosNInputStream.ReadBuffer readBuffer) {
+ this.key = key;
+ this.store = store;
+ this.readBuffer = readBuffer;
+
+ RetryPolicy defaultPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ conf.getInt(
+ CosNConfigKeys.COSN_MAX_RETRIES_KEY,
+ CosNConfigKeys.DEFAULT_MAX_RETRIES),
+ conf.getLong(
+ CosNConfigKeys.COSN_RETRY_INTERVAL_KEY,
+ CosNConfigKeys.DEFAULT_RETRY_INTERVAL),
+ TimeUnit.SECONDS);
+ Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The core CosN Filesystem implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class CosNFileSystem extends FileSystem {
+ static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);
+
+ public static final String SCHEME = "cosn";
+ public static final String PATH_DELIMITER = Path.SEPARATOR;
+
+ private URI uri;
+ private String bucket;
+ private NativeFileSystemStore store;
+ private Path workingDir;
+ private String owner = "Unknown";
+ private String group = "Unknown";
+
+ private ListeningExecutorService boundedIOThreadPool;
+ private ListeningExecutorService boundedCopyThreadPool;
+
+ public CosNFileSystem() {
+ }
+
+ public CosNFileSystem(NativeFileSystemStore store) {
+ this.store = store;
+ }
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ *
+ * @return
+ * If
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * The input stream for the COS blob store.
+ * Optimized sequential read flow based on a forward read-ahead queue
+ */
+public class CosNInputStream extends FSInputStream {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CosNInputStream.class);
+
+ /**
+ * This class is used by {@link CosNInputStream}
+ * and {@link CosNFileReadTask} to buffer data that read from COS blob store.
+ */
+ public static class ReadBuffer {
+ public static final int INIT = 1;
+ public static final int SUCCESS = 0;
+ public static final int ERROR = -1;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private Condition readyCondition = lock.newCondition();
+
+ private byte[] buffer;
+ private int status;
+ private long start;
+ private long end;
+
+ public ReadBuffer(long start, long end) {
+ this.start = start;
+ this.end = end;
+ this.buffer = new byte[(int) (this.end - this.start) + 1];
+ this.status = INIT;
+ }
+
+ public void lock() {
+ this.lock.lock();
+ }
+
+ public void unLock() {
+ this.lock.unlock();
+ }
+
+ public void await(int waitStatus) throws InterruptedException {
+ while (this.status == waitStatus) {
+ readyCondition.await();
+ }
+ }
+
+ public void signalAll() {
+ readyCondition.signalAll();
+ }
+
+ public byte[] getBuffer() {
+ return this.buffer;
+ }
+
+ public int getStatus() {
+ return this.status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+ }
+
+ private FileSystem.Statistics statistics;
+ private final Configuration conf;
+ private final NativeFileSystemStore store;
+ private final String key;
+ private long position = 0;
+ private long nextPos = 0;
+ private long fileSize;
+ private long partRemaining;
+ private final long preReadPartSize;
+ private final int maxReadPartNumber;
+ private byte[] buffer;
+ private boolean closed;
+
+ private final ExecutorService readAheadExecutorService;
+ private final Queue
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.qcloud.cos.model.PartETag;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The output stream for the COS blob store.
+ * Implement streaming upload to COS based on the multipart upload function.
+ * ( the maximum size of each part is 5GB)
+ * Support up to 40TB single file by multipart upload (each part is 5GB).
+ * Improve the upload performance of writing large files by using byte buffers
+ * and a fixed thread pool.
+ */
+public class CosNOutputStream extends OutputStream {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CosNOutputStream.class);
+
+ private final Configuration conf;
+ private final NativeFileSystemStore store;
+ private MessageDigest digest;
+ private long blockSize;
+ private String key;
+ private int currentBlockId = 0;
+ private Set
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.qcloud.cos.auth.COSCredentialsProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.cosn.auth.COSCredentialProviderList;
+import org.apache.hadoop.fs.cosn.auth.EnvironmentVariableCredentialProvider;
+import org.apache.hadoop.fs.cosn.auth.SimpleCredentialProvider;
+
+/**
+ * Utility methods for CosN code.
+ */
+public final class CosNUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(CosNUtils.class);
+
+ static final String INSTANTIATION_EXCEPTION
+ = "instantiation exception";
+ static final String NOT_COS_CREDENTIAL_PROVIDER
+ = "is not cos credential provider";
+ static final String ABSTRACT_CREDENTIAL_PROVIDER
+ = "is abstract and therefore cannot be created";
+
+ private CosNUtils() {
+ }
+
+ public static COSCredentialProviderList createCosCredentialsProviderSet(
+ Configuration conf) throws IOException {
+ COSCredentialProviderList credentialProviderList =
+ new COSCredentialProviderList();
+
+ Class>[] cosClasses = CosNUtils.loadCosProviderClasses(
+ conf,
+ CosNConfigKeys.COSN_CREDENTIALS_PROVIDER);
+ if (0 == cosClasses.length) {
+ credentialProviderList.add(new SimpleCredentialProvider(conf));
+ credentialProviderList.add(new EnvironmentVariableCredentialProvider());
+ } else {
+ for (Class> credClass : cosClasses) {
+ credentialProviderList.add(createCOSCredentialProvider(
+ conf,
+ credClass));
+ }
+ }
+
+ return credentialProviderList;
+ }
+
+ public static Class>[] loadCosProviderClasses(
+ Configuration conf,
+ String key,
+ Class>... defaultValue) throws IOException {
+ try {
+ return conf.getClasses(key, defaultValue);
+ } catch (RuntimeException e) {
+ Throwable c = e.getCause() != null ? e.getCause() : e;
+ throw new IOException("From option " + key + ' ' + c, c);
+ }
+ }
+
+ public static COSCredentialsProvider createCOSCredentialProvider(
+ Configuration conf,
+ Class> credClass) throws IOException {
+ COSCredentialsProvider credentialsProvider;
+ if (!COSCredentialsProvider.class.isAssignableFrom(credClass)) {
+ throw new IllegalArgumentException(
+ "class " + credClass + " " + NOT_COS_CREDENTIAL_PROVIDER);
+ }
+ if (Modifier.isAbstract(credClass.getModifiers())) {
+ throw new IllegalArgumentException(
+ "class " + credClass + " " + ABSTRACT_CREDENTIAL_PROVIDER);
+ }
+ LOG.debug("Credential Provider class: " + credClass.getName());
+
+ try {
+ // new credClass()
+ Constructor constructor = getConstructor(credClass);
+ if (constructor != null) {
+ credentialsProvider =
+ (COSCredentialsProvider) constructor.newInstance();
+ return credentialsProvider;
+ }
+ // new credClass(conf)
+ constructor = getConstructor(credClass, Configuration.class);
+ if (null != constructor) {
+ credentialsProvider =
+ (COSCredentialsProvider) constructor.newInstance(conf);
+ return credentialsProvider;
+ }
+
+ Method factory = getFactoryMethod(
+ credClass, COSCredentialsProvider.class, "getInstance");
+ if (null != factory) {
+ credentialsProvider = (COSCredentialsProvider) factory.invoke(null);
+ return credentialsProvider;
+ }
+
+ throw new IllegalArgumentException(
+ "Not supported constructor or factory method found"
+ );
+
+ } catch (IllegalAccessException e) {
+ throw new IOException(
+ credClass.getName() + " " + INSTANTIATION_EXCEPTION + ": " + e, e);
+ } catch (InstantiationException e) {
+ throw new IOException(
+ credClass.getName() + " " + INSTANTIATION_EXCEPTION + ": " + e, e);
+ } catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ if (targetException == null) {
+ targetException = e;
+ }
+ throw new IOException(
+ credClass.getName() + " " + INSTANTIATION_EXCEPTION + ": "
+ + targetException, targetException);
+ }
+ }
+
+ private static Constructor> getConstructor(Class> cl, Class>... args) {
+ try {
+ Constructor constructor = cl.getDeclaredConstructor(args);
+ return Modifier.isPublic(constructor.getModifiers()) ? constructor : null;
+ } catch (NoSuchMethodException e) {
+ return null;
+ }
+ }
+
+ private static Method getFactoryMethod(
+ Class> cl, Class> returnType, String methodName) {
+ try {
+ Method m = cl.getDeclaredMethod(methodName);
+ if (Modifier.isPublic(m.getModifiers())
+ && Modifier.isStatic(m.getModifiers())
+ && returnType.isAssignableFrom(m.getReturnType())) {
+ return m;
+ } else {
+ return null;
+ }
+ } catch (NoSuchMethodException e) {
+ return null;
+ }
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNativeFileSystemStore.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNativeFileSystemStore.java
new file mode 100644
index 0000000000..833f42d7be
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNativeFileSystemStore.java
@@ -0,0 +1,768 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.ClientConfig;
+import com.qcloud.cos.auth.BasicCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+import com.qcloud.cos.exception.CosClientException;
+import com.qcloud.cos.exception.CosServiceException;
+import com.qcloud.cos.http.HttpProtocol;
+import com.qcloud.cos.model.AbortMultipartUploadRequest;
+import com.qcloud.cos.model.COSObject;
+import com.qcloud.cos.model.COSObjectSummary;
+import com.qcloud.cos.model.CompleteMultipartUploadRequest;
+import com.qcloud.cos.model.CompleteMultipartUploadResult;
+import com.qcloud.cos.model.CopyObjectRequest;
+import com.qcloud.cos.model.DeleteObjectRequest;
+import com.qcloud.cos.model.GetObjectMetadataRequest;
+import com.qcloud.cos.model.GetObjectRequest;
+import com.qcloud.cos.model.InitiateMultipartUploadRequest;
+import com.qcloud.cos.model.InitiateMultipartUploadResult;
+import com.qcloud.cos.model.ListObjectsRequest;
+import com.qcloud.cos.model.ObjectListing;
+import com.qcloud.cos.model.ObjectMetadata;
+import com.qcloud.cos.model.PartETag;
+import com.qcloud.cos.model.PutObjectRequest;
+import com.qcloud.cos.model.PutObjectResult;
+import com.qcloud.cos.model.UploadPartRequest;
+import com.qcloud.cos.model.UploadPartResult;
+import com.qcloud.cos.region.Region;
+import com.qcloud.cos.utils.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.cosn.auth.COSCredentialProviderList;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.http.HttpStatus;
+
+/**
+ * The class actually performs access operation to the COS blob store.
+ * It provides the bridging logic for the Hadoop's abstract filesystem and COS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class CosNativeFileSystemStore implements NativeFileSystemStore {
+ private COSClient cosClient;
+ private String bucketName;
+ private int maxRetryTimes;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(CosNativeFileSystemStore.class);
+
+ /**
+ * Initialize the client to access COS blob storage.
+ *
+ * @param conf Hadoop configuration with COS configuration options.
+ * @throws IOException Initialize the COS client failed,
+ * caused by incorrect options.
+ */
+ private void initCOSClient(Configuration conf) throws IOException {
+ COSCredentialProviderList credentialProviderList =
+ CosNUtils.createCosCredentialsProviderSet(conf);
+ String region = conf.get(CosNConfigKeys.COSN_REGION_KEY);
+ String endpointSuffix = conf.get(
+ CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
+ if (null == region && null == endpointSuffix) {
+ String exceptionMsg = String.format("config %s and %s at least one",
+ CosNConfigKeys.COSN_REGION_KEY,
+ CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
+ throw new IOException(exceptionMsg);
+ }
+
+ COSCredentials cosCred;
+ cosCred = new BasicCOSCredentials(
+ credentialProviderList.getCredentials().getCOSAccessKeyId(),
+ credentialProviderList.getCredentials().getCOSSecretKey());
+
+ boolean useHttps = conf.getBoolean(CosNConfigKeys.COSN_USE_HTTPS_KEY,
+ CosNConfigKeys.DEFAULT_USE_HTTPS);
+
+ ClientConfig config;
+ if (null == region) {
+ config = new ClientConfig(new Region(""));
+ config.setEndPointSuffix(endpointSuffix);
+ } else {
+ config = new ClientConfig(new Region(region));
+ }
+ if (useHttps) {
+ config.setHttpProtocol(HttpProtocol.https);
+ }
+
+ config.setUserAgent(conf.get(CosNConfigKeys.USER_AGENT,
+ CosNConfigKeys.DEFAULT_USER_AGENT) + " For " + " Hadoop "
+ + VersionInfo.getVersion());
+
+ this.maxRetryTimes = conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
+ CosNConfigKeys.DEFAULT_MAX_RETRIES);
+
+ config.setMaxConnectionsCount(
+ conf.getInt(CosNConfigKeys.MAX_CONNECTION_NUM,
+ CosNConfigKeys.DEFAULT_MAX_CONNECTION_NUM));
+
+ this.cosClient = new COSClient(cosCred, config);
+ }
+
+ /**
+ * Initialize the CosNativeFileSystemStore object, including
+ * its COS client and default COS bucket.
+ *
+ * @param uri The URI of the COS bucket accessed by default.
+ * @param conf Hadoop configuration with COS configuration options.
+ * @throws IOException Initialize the COS client failed.
+ */
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ try {
+ initCOSClient(conf);
+ this.bucketName = uri.getHost();
+ } catch (Exception e) {
+ handleException(e, "");
+ }
+ }
+
+ /**
+ * Store a file into COS from the specified input stream, which would be
+ * retried until the success or maximum number.
+ *
+ * @param key COS object key.
+ * @param inputStream Input stream to be uploaded into COS.
+ * @param md5Hash MD5 value of the content to be uploaded.
+ * @param length Length of uploaded content.
+ * @throws IOException Upload the file failed.
+ */
+ private void storeFileWithRetry(String key, InputStream inputStream,
+ byte[] md5Hash, long length) throws IOException {
+ try {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash));
+ objectMetadata.setContentLength(length);
+ PutObjectRequest putObjectRequest =
+ new PutObjectRequest(bucketName, key, inputStream, objectMetadata);
+
+ PutObjectResult putObjectResult =
+ (PutObjectResult) callCOSClientWithRetry(putObjectRequest);
+ LOG.debug("Store file successfully. COS key: [{}], ETag: [{}], "
+ + "MD5: [{}].", key, putObjectResult.getETag(), new String(md5Hash));
+ } catch (Exception e) {
+ String errMsg = String.format("Store file failed. COS key: [%s], "
+ + "exception: [%s]", key, e.toString());
+ LOG.error(errMsg);
+ handleException(new Exception(errMsg), key);
+ }
+ }
+
+ /**
+ * Store a local file into COS.
+ *
+ * @param key COS object key.
+ * @param file The local file to be uploaded.
+ * @param md5Hash The MD5 value of the file to be uploaded.
+ * @throws IOException Upload the file failed.
+ */
+ @Override
+ public void storeFile(String key, File file, byte[] md5Hash)
+ throws IOException {
+ LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " +
+ "[{}] MD5: [{}].", file.getCanonicalPath(), file.length(), key,
+ new String(md5Hash));
+ storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)),
+ md5Hash, file.length());
+ }
+
+ /**
+ * Store a file into COS from the specified input stream.
+ *
+ * @param key COS object key.
+ * @param inputStream The Input stream to be uploaded.
+ * @param md5Hash The MD5 value of the content to be uploaded.
+ * @param contentLength Length of uploaded content.
+ * @throws IOException Upload the file failed.
+ */
+ @Override
+ public void storeFile(
+ String key,
+ InputStream inputStream,
+ byte[] md5Hash,
+ long contentLength) throws IOException {
+ LOG.info("Store file from input stream. COS key: [{}], "
+ + "length: [{}], MD5: [{}].", key, contentLength, md5Hash);
+ storeFileWithRetry(key, inputStream, md5Hash, contentLength);
+ }
+
+ // For cos, storeEmptyFile means creating a directory
+ @Override
+ public void storeEmptyFile(String key) throws IOException {
+ if (!key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
+ key = key + CosNFileSystem.PATH_DELIMITER;
+ }
+
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(0);
+ InputStream input = new ByteArrayInputStream(new byte[0]);
+ PutObjectRequest putObjectRequest =
+ new PutObjectRequest(bucketName, key, input, objectMetadata);
+ try {
+ PutObjectResult putObjectResult =
+ (PutObjectResult) callCOSClientWithRetry(putObjectRequest);
+ LOG.debug("Store empty file successfully. COS key: [{}], ETag: [{}].",
+ key, putObjectResult.getETag());
+ } catch (Exception e) {
+ String errMsg = String.format("Store empty file failed. "
+ + "COS key: [%s], exception: [%s]", key, e.toString());
+ LOG.error(errMsg);
+ handleException(new Exception(errMsg), key);
+ }
+ }
+
+ public PartETag uploadPart(File file, String key, String uploadId,
+ int partNum) throws IOException {
+ InputStream inputStream = new FileInputStream(file);
+ return uploadPart(inputStream, key, uploadId, partNum, file.length());
+ }
+
+ @Override
+ public PartETag uploadPart(InputStream inputStream, String key,
+ String uploadId, int partNum, long partSize) throws IOException {
+ UploadPartRequest uploadPartRequest = new UploadPartRequest();
+ uploadPartRequest.setBucketName(this.bucketName);
+ uploadPartRequest.setUploadId(uploadId);
+ uploadPartRequest.setInputStream(inputStream);
+ uploadPartRequest.setPartNumber(partNum);
+ uploadPartRequest.setPartSize(partSize);
+ uploadPartRequest.setKey(key);
+
+ try {
+ UploadPartResult uploadPartResult =
+ (UploadPartResult) callCOSClientWithRetry(uploadPartRequest);
+ return uploadPartResult.getPartETag();
+ } catch (Exception e) {
+ String errMsg = String.format("Current thread: [%d], COS key: [%s], "
+ + "upload id: [%s], part num: [%d], exception: [%s]",
+ Thread.currentThread().getId(), key, uploadId, partNum, e.toString());
+ handleException(new Exception(errMsg), key);
+ }
+
+ return null;
+ }
+
+ public void abortMultipartUpload(String key, String uploadId) {
+ LOG.info("Abort the multipart upload. COS key: [{}], upload id: [{}].",
+ key, uploadId);
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
+ new AbortMultipartUploadRequest(bucketName, key, uploadId);
+ cosClient.abortMultipartUpload(abortMultipartUploadRequest);
+ }
+
+ /**
+ * Initialize a multipart upload and return the upload id.
+ *
+ * @param key The COS object key initialized to multipart upload.
+ * @return The multipart upload id.
+ */
+ public String getUploadId(String key) {
+ if (null == key || key.length() == 0) {
+ return "";
+ }
+
+ LOG.info("Initiate a multipart upload. bucket: [{}], COS key: [{}].",
+ bucketName, key);
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+ new InitiateMultipartUploadRequest(bucketName, key);
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
+ cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+ return initiateMultipartUploadResult.getUploadId();
+ }
+
+ /**
+ * Finish a multipart upload process, which will merge all parts uploaded.
+ *
+ * @param key The COS object key to be finished.
+ * @param uploadId The upload id of the multipart upload to be finished.
+ * @param partETagList The etag list of the part that has been uploaded.
+ * @return The result object of completing the multipart upload process.
+ */
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String key, String uploadId, List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ *
+ * Holds basic metadata for a file stored in a {@link NativeFileSystemStore}.
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+
+import com.qcloud.cos.model.CompleteMultipartUploadResult;
+import com.qcloud.cos.model.PartETag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ * An abstraction for a key-based {@link File} store.
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ *
+ * Holds information on a directory listing for a
+ * {@link NativeFileSystemStore}.
+ * This includes the {@link FileMetadata files} and directories
+ * (their names) contained in a directory.
+ *
+ * This listing may be returned in chunks, so a
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+/**
+ * Constant definition of storage unit.
+ */
+public final class Unit {
+ private Unit() {
+ }
+
+ public static final int KB = 1024;
+ public static final int MB = 1024 * KB;
+ public static final int GB = 1024 * MB;
+ public static final long TB = (long) 1024 * GB;
+ public static final long PB = (long) 1024 * TB;
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/COSCredentialProviderList.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/COSCredentialProviderList.java
new file mode 100644
index 0000000000..e900b997e4
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/COSCredentialProviderList.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.auth;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import com.qcloud.cos.auth.AnonymousCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+import com.qcloud.cos.auth.COSCredentialsProvider;
+import com.qcloud.cos.exception.CosClientException;
+import com.qcloud.cos.utils.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * a list of cos credentials provider.
+ */
+public class COSCredentialProviderList implements
+ COSCredentialsProvider, AutoCloseable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(COSCredentialProviderList.class);
+
+ private static final String NO_COS_CREDENTIAL_PROVIDERS =
+ "No COS Credential Providers";
+ private static final String CREDENTIALS_REQUESTED_WHEN_CLOSED =
+ "Credentials requested after provider list was closed";
+
+ private final List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.auth;
+
+import com.qcloud.cos.auth.BasicCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+import com.qcloud.cos.auth.COSCredentialsProvider;
+import com.qcloud.cos.exception.CosClientException;
+import com.qcloud.cos.utils.StringUtils;
+
+import org.apache.hadoop.fs.cosn.Constants;
+
+/**
+ * the provider obtaining the cos credentials from the environment variables.
+ */
+public class EnvironmentVariableCredentialProvider
+ implements COSCredentialsProvider {
+ @Override
+ public COSCredentials getCredentials() {
+ String secretId = System.getenv(Constants.COSN_SECRET_ID_ENV);
+ String secretKey = System.getenv(Constants.COSN_SECRET_KEY_ENV);
+
+ secretId = StringUtils.trim(secretId);
+ secretKey = StringUtils.trim(secretKey);
+
+ if (!StringUtils.isNullOrEmpty(secretId)
+ && !StringUtils.isNullOrEmpty(secretKey)) {
+ return new BasicCOSCredentials(secretId, secretKey);
+ } else {
+ throw new CosClientException(
+ "Unable to load COS credentials from environment variables" +
+ "(COS_SECRET_ID or COS_SECRET_KEY)");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EnvironmentVariableCredentialProvider{}";
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/NoAuthWithCOSException.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/NoAuthWithCOSException.java
new file mode 100644
index 0000000000..fa188bfb3f
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/NoAuthWithCOSException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.auth;
+
+import com.qcloud.cos.exception.CosClientException;
+
+/**
+ * Exception thrown when no credentials can be obtained.
+ */
+public class NoAuthWithCOSException extends CosClientException {
+ public NoAuthWithCOSException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public NoAuthWithCOSException(String message) {
+ super(message);
+ }
+
+ public NoAuthWithCOSException(Throwable t) {
+ super(t);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/SimpleCredentialProvider.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/SimpleCredentialProvider.java
new file mode 100644
index 0000000000..f0635fc0d0
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/SimpleCredentialProvider.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.auth;
+
+import com.qcloud.cos.auth.BasicCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+import com.qcloud.cos.auth.COSCredentialsProvider;
+import com.qcloud.cos.exception.CosClientException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.cosn.CosNConfigKeys;
+
+/**
+ * Get the credentials from the hadoop configuration.
+ */
+public class SimpleCredentialProvider implements COSCredentialsProvider {
+ private String secretId;
+ private String secretKey;
+
+ public SimpleCredentialProvider(Configuration conf) {
+ this.secretId = conf.get(
+ CosNConfigKeys.COSN_SECRET_ID_KEY
+ );
+ this.secretKey = conf.get(
+ CosNConfigKeys.COSN_SECRET_KEY_KEY
+ );
+ }
+
+ @Override
+ public COSCredentials getCredentials() {
+ if (!StringUtils.isEmpty(this.secretId)
+ && !StringUtils.isEmpty(this.secretKey)) {
+ return new BasicCOSCredentials(this.secretId, this.secretKey);
+ }
+ throw new CosClientException("secret id or secret key is unset");
+ }
+
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/package-info.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/package-info.java
new file mode 100644
index 0000000000..4b6f8cf48b
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/auth/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.auth;
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/package-info.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/package-info.java
new file mode 100644
index 0000000000..b46608239a
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestConfigKey.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestConfigKey.java
new file mode 100644
index 0000000000..4d5ee4814c
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestConfigKey.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+/**
+ * Configuration options for the CosN file system for testing.
+ */
+public final class CosNTestConfigKey {
+ private CosNTestConfigKey() {
+ }
+
+ public static final String TEST_COS_FILESYSTEM_CONF_KEY =
+ "test.fs.cosn.name";
+ public static final String DEFAULT_TEST_COS_FILESYSTEM_CONF_VALUE =
+ "";
+ public static final String TEST_UNIQUE_FORK_ID_KEY =
+ "test.unique.fork.id";
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestUtils.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestUtils.java
new file mode 100644
index 0000000000..8afce51b17
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/CosNTestUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.internal.AssumptionViolatedException;
+
+/**
+ * Utilities for the CosN tests.
+ */
+public final class CosNTestUtils {
+
+ private CosNTestUtils() {
+ }
+
+ /**
+ * Create the file system for test.
+ *
+ * @param configuration hadoop's configuration
+ * @return The file system for test
+ * @throws IOException If fail to create or initialize the file system.
+ */
+ public static CosNFileSystem createTestFileSystem(
+ Configuration configuration) throws IOException {
+ String fsName = configuration.getTrimmed(
+ CosNTestConfigKey.TEST_COS_FILESYSTEM_CONF_KEY,
+ CosNTestConfigKey.DEFAULT_TEST_COS_FILESYSTEM_CONF_VALUE);
+
+ boolean liveTest = StringUtils.isNotEmpty(fsName);
+ URI testUri;
+ if (liveTest) {
+ testUri = URI.create(fsName);
+ liveTest = testUri.getScheme().equals(CosNFileSystem.SCHEME);
+ } else {
+ throw new AssumptionViolatedException("no test file system in " +
+ fsName);
+ }
+
+ CosNFileSystem cosFs = new CosNFileSystem();
+ cosFs.initialize(testUri, configuration);
+ return cosFs;
+ }
+
+ /**
+ * Create a dir path for test.
+ * The value of {@link CosNTestConfigKey#TEST_UNIQUE_FORK_ID_KEY}
+ * will be used if it is set.
+ *
+ * @param defVal default value
+ * @return The test path
+ */
+ public static Path createTestPath(Path defVal) {
+ String testUniqueForkId = System.getProperty(
+ CosNTestConfigKey.TEST_UNIQUE_FORK_ID_KEY);
+ return testUniqueForkId ==
+ null ? defVal : new Path("/" + testUniqueForkId, "test");
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNInputStream.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNInputStream.java
new file mode 100644
index 0000000000..79884bad07
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNInputStream.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * CosNInputStream Tester.
+ */
+public class TestCosNInputStream {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCosNInputStream.class);
+
+ private FileSystem fs;
+
+ private Path testRootDir;
+
+ @Before
+ public void setUp() throws IOException {
+ Configuration configuration = new Configuration();
+ this.fs = CosNTestUtils.createTestFileSystem(configuration);
+ this.testRootDir = CosNTestUtils.createTestPath(new Path("/test"));
+ LOG.info("test root dir: " + this.testRootDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (null != this.fs) {
+ this.fs.delete(this.testRootDir, true);
+ }
+ }
+
+ /**
+ * Method: seek(long pos).
+ */
+ @Test
+ public void testSeek() throws Exception {
+ Path seekTestFilePath = new Path(this.testRootDir + "/"
+ + "seekTestFile");
+ long fileSize = 5 * Unit.MB;
+
+ ContractTestUtils.generateTestFile(
+ this.fs, seekTestFilePath, fileSize, 256, 255);
+ LOG.info("5MB file for seek test has created.");
+
+ FSDataInputStream inputStream = this.fs.open(seekTestFilePath);
+ int seekTimes = 5;
+ for (int i = 0; i != seekTimes; i++) {
+ long pos = fileSize / (seekTimes - i) - 1;
+ inputStream.seek(pos);
+ assertTrue("expected position at: " +
+ pos + ", but got: " + inputStream.getPos(),
+ inputStream.getPos() == pos);
+ LOG.info("completed seeking at pos: " + inputStream.getPos());
+ }
+ LOG.info("begin to random position seeking test...");
+ Random random = new Random();
+ for (int i = 0; i < seekTimes; i++) {
+ long pos = Math.abs(random.nextLong()) % fileSize;
+ LOG.info("seeking for pos: " + pos);
+ inputStream.seek(pos);
+ assertTrue("expected position at: " +
+ pos + ", but got: " + inputStream.getPos(),
+ inputStream.getPos() == pos);
+ LOG.info("completed seeking at pos: " + inputStream.getPos());
+ }
+ }
+
+ /**
+ * Method: getPos().
+ */
+ @Test
+ public void testGetPos() throws Exception {
+ Path seekTestFilePath = new Path(this.testRootDir + "/" +
+ "seekTestFile");
+ long fileSize = 5 * Unit.MB;
+ ContractTestUtils.generateTestFile(
+ this.fs, seekTestFilePath, fileSize, 256, 255);
+ LOG.info("5MB file for getPos test has created.");
+
+ FSDataInputStream inputStream = this.fs.open(seekTestFilePath);
+ Random random = new Random();
+ long pos = Math.abs(random.nextLong()) % fileSize;
+ inputStream.seek(pos);
+ assertTrue("expected position at: " +
+ pos + ", but got: " + inputStream.getPos(),
+ inputStream.getPos() == pos);
+ LOG.info("completed get pos tests.");
+ }
+
+ /**
+ * Method: seekToNewSource(long targetPos).
+ */
+ @Ignore("Not ready yet")
+ public void testSeekToNewSource() throws Exception {
+ LOG.info("Currently it is not supported to " +
+ "seek the offset in a new source.");
+ }
+
+ /**
+ * Method: read().
+ */
+ @Test
+ public void testRead() throws Exception {
+ final int bufLen = 256;
+ Path readTestFilePath = new Path(this.testRootDir + "/"
+ + "testReadSmallFile.txt");
+ long fileSize = 5 * Unit.MB;
+
+ ContractTestUtils.generateTestFile(
+ this.fs, readTestFilePath, fileSize, 256, 255);
+ LOG.info("read test file: " + readTestFilePath + " has created.");
+
+ FSDataInputStream inputStream = this.fs.open(readTestFilePath);
+ byte[] buf = new byte[bufLen];
+ long bytesRead = 0;
+ while (bytesRead < fileSize) {
+ int bytes = 0;
+ if (fileSize - bytesRead < bufLen) {
+ int remaining = (int) (fileSize - bytesRead);
+ bytes = inputStream.read(buf, 0, remaining);
+ } else {
+ bytes = inputStream.read(buf, 0, bufLen);
+ }
+ bytesRead += bytes;
+
+ if (bytesRead % (1 * Unit.MB) == 0) {
+ int available = inputStream.available();
+ assertTrue("expected remaining: " + (fileSize - bytesRead) +
+ " but got: " + available, (fileSize - bytesRead) == available);
+ LOG.info("Bytes read: " +
+ Math.round((double) bytesRead / Unit.MB) + "MB");
+ }
+ }
+
+ assertTrue(inputStream.available() == 0);
+ IOUtils.closeStream(inputStream);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNOutputStream.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNOutputStream.java
new file mode 100644
index 0000000000..7fd88976d6
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/TestCosNOutputStream.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+
+/**
+ * CosNOutputStream Tester.
+ *
+ * If the test.fs.cosn.name property is not set, all test case will fail.
+ */
+public class TestCosNOutputStream {
+ private FileSystem fs;
+ private Path testRootDir;
+
+ @Rule
+ public Timeout timeout = new Timeout(3600 * 1000);
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setInt(
+ CosNConfigKeys.COSN_BLOCK_SIZE_KEY, 2 * Unit.MB);
+ configuration.setLong(
+ CosNConfigKeys.COSN_UPLOAD_BUFFER_SIZE_KEY,
+ CosNConfigKeys.DEFAULT_UPLOAD_BUFFER_SIZE);
+ this.fs = CosNTestUtils.createTestFileSystem(configuration);
+ this.testRootDir = new Path("/test");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testEmptyFileUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(this.fs, this.testRootDir, 0);
+ }
+
+ @Test
+ public void testSingleFileUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 1 * Unit.MB - 1);
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 1 * Unit.MB);
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 2 * Unit.MB - 1);
+ }
+
+ @Test
+ public void testLargeFileUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 2 * Unit.MB);
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 2 * Unit.MB + 1);
+ ContractTestUtils.createAndVerifyFile(
+ this.fs, this.testRootDir, 100 * Unit.MB);
+ // In principle, a maximum boundary test (file size: 2MB * 10000 - 1)
+ // should be provided here,
+ // but it is skipped due to network bandwidth and test time constraints.
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/CosNContract.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/CosNContract.java
new file mode 100644
index 0000000000..cd4097951d
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/CosNContract.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+import org.apache.hadoop.fs.cosn.CosNFileSystem;
+import org.apache.hadoop.fs.cosn.CosNTestUtils;
+
+/**
+ * The contract of CosN: only enabled if the test bucket is provided.
+ */
+public class CosNContract extends AbstractBondedFSContract {
+ private static final String CONTRACT_XML = "contract/cosn.xml";
+
+ protected CosNContract(Configuration conf) {
+ super(conf);
+ addConfResource(CONTRACT_XML);
+ }
+
+ @Override
+ public String getScheme() {
+ return CosNFileSystem.SCHEME;
+ }
+
+ @Override
+ public Path getTestPath() {
+ return CosNTestUtils.createTestPath(super.getTestPath());
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractCreate.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractCreate.java
new file mode 100644
index 0000000000..9488bd469a
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractCreate.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for creating files.
+ */
+public class TestCosNContractCreate extends AbstractContractCreateTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDelete.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDelete.java
new file mode 100644
index 0000000000..1c23ac2184
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDelete.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for deleting files.
+ */
+public class TestCosNContractDelete extends AbstractContractDeleteTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDistCp.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDistCp.java
new file mode 100644
index 0000000000..75ac53b9de
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractDistCp.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.cosn.CosNConfigKeys;
+import org.apache.hadoop.fs.cosn.Unit;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+/**
+ * Contract test suit covering CosN integration with DistCp.
+ */
+public class TestCosNContractDistCp extends AbstractContractDistCpTest {
+
+ private static final int MULTIPART_SETTING = 2 * Unit.MB;
+ private static final long UPLOAD_BUFFER_POOL_SIZE = 5 * 2 * Unit.MB;
+ private static final int UPLOAD_THREAD_POOL_SIZE = 5;
+ private static final int COPY_THREAD_POOL_SIZE = 3;
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new CosNContract(conf);
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration newConf = super.createConfiguration();
+ newConf.setInt(CosNConfigKeys.COSN_BLOCK_SIZE_KEY,
+ MULTIPART_SETTING);
+ newConf.setLong(CosNConfigKeys.COSN_UPLOAD_BUFFER_SIZE_KEY,
+ UPLOAD_BUFFER_POOL_SIZE);
+ newConf.setInt(CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY,
+ UPLOAD_THREAD_POOL_SIZE);
+ newConf.setInt(CosNConfigKeys.COPY_THREAD_POOL_SIZE_KEY,
+ COPY_THREAD_POOL_SIZE);
+ return newConf;
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractGetFileStatus.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractGetFileStatus.java
new file mode 100644
index 0000000000..9fba6eef44
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractGetFileStatus.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests covering getFileStatus.
+ */
+public class TestCosNContractGetFileStatus
+ extends AbstractContractGetFileStatusTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new CosNContract(conf);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractMkdir.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractMkdir.java
new file mode 100644
index 0000000000..e704e13df8
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractMkdir.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for making directories.
+ */
+public class TestCosNContractMkdir extends AbstractContractMkdirTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractOpen.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractOpen.java
new file mode 100644
index 0000000000..1bb732b312
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractOpen.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for opening files.
+ */
+public class TestCosNContractOpen extends AbstractContractOpenTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRename.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRename.java
new file mode 100644
index 0000000000..f82c8dfb4c
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRename.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for renaming a file.
+ */
+public class TestCosNContractRename extends AbstractContractRenameTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRootDir.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRootDir.java
new file mode 100644
index 0000000000..145aee9ca1
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractRootDir.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * root dir operations against an COS bucket.
+ */
+public class TestCosNContractRootDir
+ extends AbstractContractRootDirectoryTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractSeek.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractSeek.java
new file mode 100644
index 0000000000..e3915676ee
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/TestCosNContractSeek.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * CosN contract tests for seeking a position in a file.
+ */
+public class TestCosNContractSeek extends AbstractContractSeekTest {
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new CosNContract(configuration);
+ }
+}
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/package-info.java b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/package-info.java
new file mode 100644
index 0000000000..97598b10bc
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/java/org/apache/hadoop/fs/cosn/contract/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.cosn.contract;
\ No newline at end of file
diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/test/resources/contract/cosn.xml b/hadoop-cloud-storage-project/hadoop-cos/src/test/resources/contract/cosn.xml
new file mode 100644
index 0000000000..ac4f58c066
--- /dev/null
+++ b/hadoop-cloud-storage-project/hadoop-cos/src/test/resources/contract/cosn.xml
@@ -0,0 +1,120 @@
+
+
+cosn
+ */
+ @Override
+ public String getScheme() {
+ return CosNFileSystem.SCHEME;
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ this.bucket = name.getHost();
+ if (this.store == null) {
+ this.store = createDefaultStore(conf);
+ }
+ this.store.initialize(name, conf);
+ setConf(conf);
+ this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+ this.workingDir = new Path("/user",
+ System.getProperty("user.name")).makeQualified(
+ this.uri,
+ this.getWorkingDirectory());
+ this.owner = getOwnerId();
+ this.group = getGroupId();
+ LOG.debug("owner:" + owner + ", group:" + group);
+
+ BufferPool.getInstance().initialize(this.getConf());
+
+ // initialize the thread pool
+ int uploadThreadPoolSize = this.getConf().getInt(
+ CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY,
+ CosNConfigKeys.DEFAULT_UPLOAD_THREAD_POOL_SIZE
+ );
+ int readAheadPoolSize = this.getConf().getInt(
+ CosNConfigKeys.READ_AHEAD_QUEUE_SIZE,
+ CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE
+ );
+ int ioThreadPoolSize = uploadThreadPoolSize + readAheadPoolSize / 3;
+ long threadKeepAlive = this.getConf().getLong(
+ CosNConfigKeys.THREAD_KEEP_ALIVE_TIME_KEY,
+ CosNConfigKeys.DEFAULT_THREAD_KEEP_ALIVE_TIME
+ );
+ this.boundedIOThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ ioThreadPoolSize / 2, ioThreadPoolSize,
+ threadKeepAlive, TimeUnit.SECONDS,
+ "cos-transfer-thread-pool");
+ int copyThreadPoolSize = this.getConf().getInt(
+ CosNConfigKeys.COPY_THREAD_POOL_SIZE_KEY,
+ CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE
+ );
+ this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE, copyThreadPoolSize,
+ 60L, TimeUnit.SECONDS,
+ "cos-copy-thread-pool");
+ }
+
+ private static NativeFileSystemStore createDefaultStore(Configuration conf) {
+ NativeFileSystemStore store = new CosNativeFileSystemStore();
+ RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
+ CosNConfigKeys.DEFAULT_MAX_RETRIES),
+ conf.getLong(CosNConfigKeys.COSN_RETRY_INTERVAL_KEY,
+ CosNConfigKeys.DEFAULT_RETRY_INTERVAL),
+ TimeUnit.SECONDS);
+ Mapf
is a file, this method will make a single call to COS.
+ * If f
is a directory,
+ * this method will make a maximum of ( n / 199) + 2 calls to cos,
+ * where n is the total number of files
+ * and directories contained directly in f
.
+ * priorLastKey
+ * is provided so that the next chunk may be requested.
+ *