HADOOP-15671. AliyunOSS: Support Assume Roles in AliyunOSS. Contributed by Jinhu Wu.

This commit is contained in:
Sammi Chen 2018-09-25 19:48:30 +08:00
parent 93b0f540ed
commit 2b635125fb
7 changed files with 248 additions and 12 deletions

View File

@ -120,7 +120,8 @@ public synchronized void close() throws IOException {
if (null == partETags) {
throw new IOException("Failed to multipart upload to oss, abort it.");
}
store.completeMultipartUpload(key, uploadId, partETags);
store.completeMultipartUpload(key, uploadId,
new ArrayList<>(partETags));
}
} finally {
removePartFiles();
@ -129,7 +130,7 @@ public synchronized void close() throws IOException {
}
@Override
public void write(int b) throws IOException {
public synchronized void write(int b) throws IOException {
singleByte[0] = (byte)b;
write(singleByte, 0, 1);
}

View File

@ -149,7 +149,7 @@ public void initialize(URI uri, Configuration conf,
"null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
}
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(conf);
AliyunOSSUtils.getCredentialsProvider(uri, conf);
ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
@ -168,6 +168,8 @@ public void initialize(URI uri, Configuration conf,
multipartThreshold = 1024 * 1024 * 1024;
}
bucketName = uri.getHost();
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (StringUtils.isNotEmpty(cannedACLName)) {
CannedAccessControlList cannedACL =
@ -176,7 +178,6 @@ public void initialize(URI uri, Configuration conf,
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
bucketName = uri.getHost();
}
/**

View File

@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.google.common.base.Preconditions;
@ -95,13 +96,14 @@ public static long calculatePartSize(long contentLength, long minPartSize) {
* Create credential provider specified by configuration, or create default
* credential provider if not specified.
*
* @param uri uri passed by caller
* @param conf configuration
* @return a credential provider
* @throws IOException on any problem. Class construction issues may be
* nested inside the IOE.
*/
public static CredentialsProvider getCredentialsProvider(Configuration conf)
throws IOException {
public static CredentialsProvider getCredentialsProvider(
URI uri, Configuration conf) throws IOException {
CredentialsProvider credentials;
String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
@ -117,7 +119,7 @@ public static CredentialsProvider getCredentialsProvider(Configuration conf)
try {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor(
Configuration.class).newInstance(conf);
URI.class, Configuration.class).newInstance(uri, conf);
} catch (NoSuchMethodException | SecurityException e) {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor()

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import com.aliyun.oss.common.auth.STSAssumeRoleSessionCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
/**
* Support assumed role credentials for authenticating with Aliyun.
* roleArn is configured in core-site.xml
*/
public class AssumedRoleCredentialProvider implements CredentialsProvider {
private static final Logger LOG =
LoggerFactory.getLogger(AssumedRoleCredentialProvider.class);
public static final String NAME
= "org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider";
private Credentials credentials;
private String roleArn;
private long duration;
private String stsEndpoint;
private String sessionName;
private double expiredFactor;
private STSAssumeRoleSessionCredentialsProvider stsCredentialsProvider;
public AssumedRoleCredentialProvider(URI uri, Configuration conf) {
roleArn = conf.getTrimmed(Constants.ROLE_ARN, "");
if (StringUtils.isEmpty(roleArn)) {
throw new InvalidCredentialsException(
"fs.oss.assumed.role.arn is empty");
}
duration = conf.getLong(Constants.ASSUMED_ROLE_DURATION,
Constants.ASSUMED_ROLE_DURATION_DEFAULT);
expiredFactor = conf.getDouble(Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR,
Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT);
stsEndpoint = conf.getTrimmed(Constants.ASSUMED_ROLE_STS_ENDPOINT, "");
if (StringUtils.isEmpty(stsEndpoint)) {
throw new InvalidCredentialsException(
"fs.oss.assumed.role.sts.endpoint is empty");
}
sessionName = conf.getTrimmed(Constants.ASSUMED_ROLE_SESSION_NAME, "");
String accessKeyId;
String accessKeySecret;
try {
accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
} catch (IOException e) {
throw new InvalidCredentialsException(e);
}
try {
DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
} catch (ClientException e) {
throw new InvalidCredentialsException(e);
}
stsCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider(
new com.aliyuncs.auth.BasicCredentials(accessKeyId, accessKeySecret),
roleArn, DefaultProfile.getProfile("", accessKeyId, accessKeySecret))
.withExpiredDuration(duration).withExpiredFactor(expiredFactor);
if (!StringUtils.isEmpty(sessionName)) {
stsCredentialsProvider.withRoleSessionName(sessionName);
}
}
@Override
public void setCredentials(Credentials creds) {
throw new InvalidCredentialsException(
"Should not set credentials from external call");
}
@Override
public Credentials getCredentials() {
credentials = stsCredentialsProvider.getCredentials();
if (credentials == null) {
throw new InvalidCredentialsException("Invalid credentials");
}
return credentials;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.utils.AuthUtils;
import com.aliyun.oss.common.utils.VersionInfoUtils;
/**
@ -42,6 +43,27 @@ private Constants() {
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
public static final String SECURITY_TOKEN = "fs.oss.securityToken";
// Assume role configurations
public static final String ROLE_ARN = "fs.oss.assumed.role.arn";
public static final String ASSUMED_ROLE_DURATION =
"fs.oss.assumed.role.session.duration";
// Default session duration(in seconds)
public static final long ASSUMED_ROLE_DURATION_DEFAULT = 30 * 60;
// Expired factor of sts token
// For example, if session duration is 900s and expiredFactor is 0.8
// sts token will be refreshed after 900 * 0.8s
public static final String ASSUMED_ROLE_STS_EXPIRED_FACTOR =
"fs.oss.assumed.role.sts.expiredFactor";
public static final double ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT =
AuthUtils.DEFAULT_EXPIRED_FACTOR;
public static final String ASSUMED_ROLE_STS_ENDPOINT =
"fs.oss.assumed.role.sts.endpoint";
public static final String ASSUMED_ROLE_SESSION_NAME =
"fs.oss.assumed.role.session.name";
// Number of simultaneous connections to oss
public static final String MAXIMUM_CONNECTIONS_KEY =
"fs.oss.connection.maximum";

View File

@ -117,6 +117,56 @@ please raise your issues with them.
</description>
</property>
<property>
<name>fs.oss.assumed.role.arn</name>
<description>
Role ARN for the role to be assumed.
Required if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.sts.endpoint</name>
<description>
STS Token Service endpoint.
Required if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.session.name</name>
<value />
<description>
Session name for the assumed role, must be valid characters
according to Aliyun API. It is optional, will be generated by
oss java sdk if it is empty.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.session.duration</name>
<value />
<description>
Duration of assumed roles before it is expired. Default is 30 minutes.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.assumed.role.sts.expiredFactor</name>
<value />
<description>
Sts token will be refreshed after (expiredFactor * duration) seconds.
Only used if the fs.oss.credentials.provider is
org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
</description>
</property>
<property>
<name>fs.oss.proxy.host</name>
<description>Hostname of the (optinal) proxy server for Aliyun OSS connection</description>

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
@ -27,9 +28,15 @@
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_SESSION_NAME;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_STS_ENDPOINT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.CREDENTIALS_PROVIDER_KEY;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ROLE_ARN;
import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
/**
@ -63,16 +70,54 @@ public void testCredentialMissingAccessKeySecret() throws Throwable {
validateCredential(conf);
}
private void validateCredential(Configuration conf) {
@Test
public void testCredentialMissingRoleArn() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ROLE_ARN, "");
validateCredential(conf);
}
@Test
public void testCredentialMissingStsEndpoint() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_STS_ENDPOINT, "");
validateCredential(conf);
}
@Test
public void testCredentialInvalidSessionName() throws Throwable {
Configuration conf = new Configuration();
conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_SESSION_NAME, "hadoop oss");
validateCredential(conf);
}
private void validateCredential(URI uri, Configuration conf) {
try {
AliyunCredentialsProvider provider
= new AliyunCredentialsProvider(conf);
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(uri, conf);
Credentials credentials = provider.getCredentials();
fail("Expected a CredentialInitializationException, got " + credentials);
} catch (InvalidCredentialsException expected) {
// expected
} catch (IOException e) {
fail("Unexpected exception.");
Throwable cause = e.getCause();
if (cause instanceof InvocationTargetException) {
boolean isInstance =
((InvocationTargetException)cause).getTargetException()
instanceof InvalidCredentialsException;
if (!isInstance) {
fail("Unexpected exception.");
}
} else {
fail("Unexpected exception.");
}
}
}
}
private void validateCredential(Configuration conf) {
validateCredential(null, conf);
}
}