HADOOP-11262. Enable YARN to use S3A. (Pieter Reuse via lei)
This commit is contained in:
parent
30c7dfd8ba
commit
126705f67e
@ -720,6 +720,8 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-12657. Add a option to skip newline on empty files with getMerge -nl.
|
||||
(Kanaka Kumar Avvaru via aajisaka)
|
||||
|
||||
HADOOP-11262. Enable YARN to use S3A. (Pieter Reuse via lei)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-12458. Retries is typoed to spell Retires in parts of
|
||||
|
@ -936,6 +936,12 @@ for ldap providers in the same way as above does.
|
||||
<description>The implementation class of the S3A Filesystem</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.AbstractFileSystem.s3a.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3a.S3A</value>
|
||||
<description>The implementation class of the S3A AbstractFileSystem.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>io.seqfile.compress.blocksize</name>
|
||||
<value>1000000</value>
|
||||
|
@ -1166,7 +1166,7 @@ public void testUnsupportedSymlink() throws IOException {
|
||||
fc.createSymlink(file, link, false);
|
||||
Assert.fail("Created a symlink on a file system that "+
|
||||
"does not support symlinks.");
|
||||
} catch (IOException e) {
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// Expected
|
||||
}
|
||||
createFile(file);
|
||||
|
@ -128,6 +128,22 @@
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-examples</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -113,4 +113,6 @@ public class Constants {
|
||||
public static final String S3N_FOLDER_SUFFIX = "_$folder$";
|
||||
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
|
||||
public static final String FS_S3A = "s3a";
|
||||
|
||||
public static final int S3A_DEFAULT_PORT = -1;
|
||||
}
|
||||
|
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DelegateToFileSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* S3A implementation of AbstractFileSystem.
|
||||
* This impl delegates to the S3AFileSystem
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class S3A extends DelegateToFileSystem{
|
||||
|
||||
public S3A(URI theUri, Configuration conf)
|
||||
throws IOException, URISyntaxException {
|
||||
super(theUri, new S3AFileSystem(), conf, "s3a", false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUriDefaultPort() {
|
||||
return Constants.S3A_DEFAULT_PORT;
|
||||
}
|
||||
}
|
@ -15,7 +15,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -41,6 +40,11 @@ public boolean isEmptyDirectory() {
|
||||
return isEmptyDirectory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOwner() {
|
||||
return System.getProperty("user.name");
|
||||
}
|
||||
|
||||
/** Compare if this object is equal to another object
|
||||
* @param o the object to be compared.
|
||||
* @return true if two file status has the same path name; false if not.
|
||||
@ -60,4 +64,23 @@ public boolean equals(Object o) {
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
/** Get the modification time of the file/directory.
|
||||
*
|
||||
* s3a uses objects as "fake" directories, which are not updated to
|
||||
* reflect the accurate modification time. We choose to report the
|
||||
* current time because some parts of the ecosystem (e.g. the
|
||||
* HistoryServer) use modification time to ignore "old" directories.
|
||||
*
|
||||
* @return for files the modification time in milliseconds since January 1,
|
||||
* 1970 UTC or for directories the current time.
|
||||
*/
|
||||
@Override
|
||||
public long getModificationTime(){
|
||||
if(isDirectory()){
|
||||
return System.currentTimeMillis();
|
||||
} else {
|
||||
return super.getModificationTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -265,15 +265,24 @@ public void initialize(URI name, Configuration conf) throws IOException {
|
||||
*
|
||||
* @return "s3a"
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "s3a";
|
||||
}
|
||||
|
||||
/** Returns a URI whose scheme and authority identify this FileSystem.*/
|
||||
/**
|
||||
* Returns a URI whose scheme and authority identify this FileSystem.
|
||||
*/
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDefaultPort() {
|
||||
return Constants.S3A_DEFAULT_PORT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the S3 client used by this filesystem.
|
||||
* @return AmazonS3Client
|
||||
@ -339,8 +348,10 @@ public FSDataInputStream open(Path f, int bufferSize)
|
||||
* @throws IOException
|
||||
* @see #setPermission(Path, FsPermission)
|
||||
*/
|
||||
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
|
||||
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
String key = pathToKey(f);
|
||||
|
||||
if (!overwrite && exists(f)) {
|
||||
|
@ -47,10 +47,14 @@ recursive file-by-file operations. They take time at least proportional to
|
||||
the number of files, during which time partial updates may be visible. If
|
||||
the operations are interrupted, the filesystem is left in an intermediate state.
|
||||
|
||||
## Warning #2: Because Object stores don't track modification times of directories,
|
||||
features of Hadoop relying on this can have unexpected behaviour. E.g. the
|
||||
AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
|
||||
|
||||
For further discussion on these topics, please consult
|
||||
[The Hadoop FileSystem API Definition](../../../hadoop-project-dist/hadoop-common/filesystem/index.html).
|
||||
|
||||
## Warning #2: your AWS credentials are valuable
|
||||
## Warning #3: your AWS credentials are valuable
|
||||
|
||||
Your AWS credentials not only pay for services, they offer read and write
|
||||
access to the data. Anyone with the credentials can not only read your datasets
|
||||
@ -301,6 +305,12 @@ If you do any of these: change your credentials immediately!
|
||||
<description>The implementation class of the S3A Filesystem</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.AbstractFileSystem.s3a.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3a.S3A</value>
|
||||
<description>The implementation class of the S3A AbstractFileSystem.</description>
|
||||
</property>
|
||||
|
||||
### S3AFastOutputStream
|
||||
**Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -51,4 +52,24 @@ public static S3AFileSystem createTestFileSystem(Configuration conf) throws
|
||||
fs1.initialize(testURI, conf);
|
||||
return fs1;
|
||||
}
|
||||
|
||||
public static FileContext createTestFileContext(Configuration conf) throws
|
||||
IOException {
|
||||
String fsname = conf.getTrimmed(TestS3AFileSystemContract.TEST_FS_S3A_NAME, "");
|
||||
|
||||
boolean liveTest = !StringUtils.isEmpty(fsname);
|
||||
URI testURI = null;
|
||||
if (liveTest) {
|
||||
testURI = URI.create(fsname);
|
||||
liveTest = testURI.getScheme().equals(Constants.FS_S3A);
|
||||
}
|
||||
if (!liveTest) {
|
||||
// This doesn't work with our JUnit 3 style test cases, so instead we'll
|
||||
// make this whole class not run by default
|
||||
throw new AssumptionViolatedException(
|
||||
"No test filesystem in " + TestS3AFileSystemContract.TEST_FS_S3A_NAME);
|
||||
}
|
||||
FileContext fc = FileContext.getFileContext(testURI,conf);
|
||||
return fc;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import org.apache.hadoop.fs.TestFileContext;
|
||||
|
||||
/**
|
||||
* Implementation of TestFileContext for S3a
|
||||
*/
|
||||
public class TestS3AFileContext extends TestFileContext{
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Extends FileContextCreateMkdirBaseTest for a S3a FileContext
|
||||
*/
|
||||
public class TestS3AFileContextCreateMkdir
|
||||
extends FileContextCreateMkdirBaseTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc = S3ATestUtils.createTestFileContext(conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContextMainOperationsBaseTest;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* S3A implementation of FileContextMainOperationsBaseTest
|
||||
*/
|
||||
public class TestS3AFileContextMainOperations
|
||||
extends FileContextMainOperationsBaseTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc = S3ATestUtils.createTestFileContext(conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean listCorruptedBlocksSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testCreateFlagAppendExistingFile() throws IOException {
|
||||
//append not supported, so test removed
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testCreateFlagCreateAppendExistingFile() throws IOException {
|
||||
//append not supported, so test removed
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testSetVerifyChecksum() throws IOException {
|
||||
//checksums ignored, so test removed
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.net.URI;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FCStatisticsBaseTest;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* S3a implementation of FCStatisticsBaseTest
|
||||
*/
|
||||
public class TestS3AFileContextStatistics extends FCStatisticsBaseTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc = S3ATestUtils.createTestFileContext(conf);
|
||||
fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void verifyReadBytes(FileSystem.Statistics stats) {
|
||||
// one blockSize for read, one for pread
|
||||
Assert.assertEquals(2 * blockSize, stats.getBytesRead());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void verifyWrittenBytes(FileSystem.Statistics stats) {
|
||||
//No extra bytes are written
|
||||
Assert.assertEquals(blockSize, stats.getBytesWritten());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected URI getFsUri() {
|
||||
return fc.getHomeDirectory().toUri();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContextURIBase;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* S3a implementation of FileContextURIBase
|
||||
*/
|
||||
public class TestS3AFileContextURI extends FileContextURIBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc1 = S3ATestUtils.createTestFileContext(conf);
|
||||
fc2 = S3ATestUtils.createTestFileContext(conf); //different object, same FS
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testFileStatus() throws IOException {
|
||||
//test disabled (the statistics tested with this method are not relevant for an S3FS)
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContextUtilBase;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* S3A implementation of FileContextUtilBase
|
||||
*/
|
||||
public class TestS3AFileContextUtil extends FileContextUtilBase {
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc = S3ATestUtils.createTestFileContext(conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.yarn;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestS3A {
|
||||
private FileContext fc;
|
||||
|
||||
@Rule
|
||||
public final Timeout testTimeout = new Timeout(90000);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
fc = S3ATestUtils.createTestFileContext(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
fc.delete(getTestPath(),true);
|
||||
}
|
||||
|
||||
protected Path getTestPath() {
|
||||
return new Path("/tests3afc");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3AStatus() throws Exception {
|
||||
FsStatus fsStatus = fc.getFsStatus(null);
|
||||
assertNotNull(fsStatus);
|
||||
assertTrue("Used capacity should be positive: " + fsStatus.getUsed(),
|
||||
fsStatus.getUsed() >= 0);
|
||||
assertTrue("Remaining capacity should be positive: " + fsStatus
|
||||
.getRemaining(),
|
||||
fsStatus.getRemaining() >= 0);
|
||||
assertTrue("Capacity should be positive: " + fsStatus.getCapacity(),
|
||||
fsStatus.getCapacity() >= 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3ACreateFileInSubDir() throws Exception {
|
||||
Path dirPath = getTestPath();
|
||||
fc.mkdir(dirPath,FileContext.DIR_DEFAULT_PERM,true);
|
||||
Path filePath = new Path(dirPath, "file");
|
||||
try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag
|
||||
.CREATE))) {
|
||||
file.write(666);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,144 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.s3a.yarn;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.examples.WordCount;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestS3AMiniYarnCluster {
|
||||
|
||||
private final Configuration conf = new YarnConfiguration();
|
||||
private S3AFileSystem fs;
|
||||
private MiniYARNCluster yarnCluster;
|
||||
private final String rootPath = "/tests/MiniClusterWordCount/";
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws IOException {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
fs.mkdirs(new Path(rootPath + "input/"));
|
||||
|
||||
yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName
|
||||
1, // number of node managers
|
||||
1, // number of local log dirs per node manager
|
||||
1); // number of hdfs dirs per node manager
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() throws IOException {
|
||||
fs.delete(new Path(rootPath), true);
|
||||
yarnCluster.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMiniCluster() throws Exception {
|
||||
Path input = new Path(rootPath + "input/in.txt");
|
||||
input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
Path output = new Path(rootPath + "output/");
|
||||
output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
|
||||
writeStringToFile(input, "first line\nsecond line\nthird line");
|
||||
|
||||
Job job = Job.getInstance(conf, "word count");
|
||||
job.setJarByClass(WordCount.class);
|
||||
job.setMapperClass(WordCount.TokenizerMapper.class);
|
||||
job.setCombinerClass(WordCount.IntSumReducer.class);
|
||||
job.setReducerClass(WordCount.IntSumReducer.class);
|
||||
job.setOutputKeyClass(Text.class);
|
||||
job.setOutputValueClass(IntWritable.class);
|
||||
FileInputFormat.addInputPath(job, input);
|
||||
FileOutputFormat.setOutputPath(job, output);
|
||||
|
||||
int exitCode = (job.waitForCompletion(true) ? 0 : 1);
|
||||
assertEquals("Returned error code.", 0, exitCode);
|
||||
|
||||
assertTrue(fs.exists(new Path(output, "_SUCCESS")));
|
||||
String outputAsStr = readStringFromFile(new Path(output, "part-r-00000"));
|
||||
Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
|
||||
|
||||
assertEquals(4, resAsMap.size());
|
||||
assertEquals(1, (int) resAsMap.get("first"));
|
||||
assertEquals(1, (int) resAsMap.get("second"));
|
||||
assertEquals(1, (int) resAsMap.get("third"));
|
||||
assertEquals(3, (int) resAsMap.get("line"));
|
||||
}
|
||||
|
||||
/**
|
||||
* helper method
|
||||
*/
|
||||
private Map<String, Integer> getResultAsMap(String outputAsStr) throws IOException {
|
||||
Map<String, Integer> result = new HashMap<>();
|
||||
for (String line : outputAsStr.split("\n")) {
|
||||
String[] tokens = line.split("\t");
|
||||
result.put(tokens[0], Integer.parseInt(tokens[1]));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* helper method
|
||||
*/
|
||||
private void writeStringToFile(Path path, String string) throws IOException {
|
||||
FileContext fc = S3ATestUtils.createTestFileContext(conf);
|
||||
try (FSDataOutputStream file = fc.create(path,
|
||||
EnumSet.of(CreateFlag.CREATE))) {
|
||||
file.write(string.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* helper method
|
||||
*/
|
||||
private String readStringFromFile(Path path) {
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
long bytesLen = fs.getFileStatus(path).getLen();
|
||||
byte[] buffer = new byte[(int) bytesLen];
|
||||
IOUtils.readFully(in, buffer, 0, buffer.length);
|
||||
return new String(buffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to read from [" + path + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -43,9 +43,8 @@
|
||||
# add the property test.fs.s3n.name to point to an S3 filesystem URL
|
||||
# Add the credentials for the service you are testing against
|
||||
-->
|
||||
<include xmlns="http://www.w3.org/2001/XInclude"
|
||||
href="auth-keys.xml"/>
|
||||
|
||||
|
||||
<include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
|
||||
<fallback/>
|
||||
</include>
|
||||
|
||||
</configuration>
|
||||
|
Loading…
Reference in New Issue
Block a user