HDFS-17576. Support user defined auth Callback. (#6945)

This commit is contained in:
Tsz-Wo Nicholas Sze 2024-07-20 15:21:06 +08:00 committed by GitHub
parent 9dad697dbc
commit a5eb5e9611
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 145 additions and 11 deletions

View File

@ -236,6 +236,9 @@ public interface HdfsClientConfigKeys {
String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
"dfs.data.transfer.saslproperties.resolver.class"; "dfs.data.transfer.saslproperties.resolver.class";
String DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY
= "dfs.data.transfer.sasl.CustomizedCallbackHandler.class";
String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY =
"dfs.encrypt.data.transfer.cipher.key.bitlength"; "dfs.encrypt.data.transfer.cipher.key.bitlength";
int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128; int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128;

View File

@ -102,9 +102,9 @@ public static void checkSaslComplete(SaslParticipant sasl,
Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList( Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
saslProps.get(Sasl.QOP).split(","))); saslProps.get(Sasl.QOP).split(",")));
String negotiatedQop = sasl.getNegotiatedQop(); String negotiatedQop = sasl.getNegotiatedQop();
LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}", LOG.debug("{}: Verifying QOP: requested = {}, negotiated = {}",
requestedQop, negotiatedQop); sasl, requestedQop, negotiatedQop);
if (!requestedQop.contains(negotiatedQop)) { if (negotiatedQop != null && !requestedQop.contains(negotiatedQop)) {
throw new IOException(String.format("SASL handshake completed, but " + throw new IOException(String.format("SASL handshake completed, but " +
"channel does not have acceptable quality of protection, " + "channel does not have acceptable quality of protection, " +
"requested = %s, negotiated = %s", requestedQop, negotiatedQop)); "requested = %s, negotiated = %s", requestedQop, negotiatedQop));

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import java.io.IOException;
import java.util.List;
/** For handling customized {@link Callback}. */
public interface CustomizedCallbackHandler {
class DefaultHandler implements CustomizedCallbackHandler{
@Override
public void handleCallback(List<Callback> callbacks, String username, char[] password)
throws UnsupportedCallbackException {
if (!callbacks.isEmpty()) {
throw new UnsupportedCallbackException(callbacks.get(0));
}
}
}
void handleCallback(List<Callback> callbacks, String name, char[] password)
throws UnsupportedCallbackException, IOException;
}

View File

@ -29,6 +29,7 @@
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,6 +47,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.crypto.CipherOption;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@ -178,7 +180,7 @@ private IOStreamPair getEncryptedStreams(Peer peer,
dnConf.getEncryptionAlgorithm()); dnConf.getEncryptionAlgorithm());
} }
CallbackHandler callbackHandler = new SaslServerCallbackHandler( final CallbackHandler callbackHandler = new SaslServerCallbackHandler(dnConf.getConf(),
new PasswordFunction() { new PasswordFunction() {
@Override @Override
public char[] apply(String userName) throws IOException { public char[] apply(String userName) throws IOException {
@ -195,7 +197,7 @@ public char[] apply(String userName) throws IOException {
* logic. It's similar to a Guava Function, but we need to let it throw * logic. It's similar to a Guava Function, but we need to let it throw
* exceptions. * exceptions.
*/ */
private interface PasswordFunction { interface PasswordFunction {
/** /**
* Returns the SASL password for the given user name. * Returns the SASL password for the given user name.
@ -210,18 +212,27 @@ private interface PasswordFunction {
/** /**
* Sets user name and password when asked by the server-side SASL object. * Sets user name and password when asked by the server-side SASL object.
*/ */
private static final class SaslServerCallbackHandler static final class SaslServerCallbackHandler
implements CallbackHandler { implements CallbackHandler {
private final PasswordFunction passwordFunction; private final PasswordFunction passwordFunction;
private final CustomizedCallbackHandler customizedCallbackHandler;
/** /**
* Creates a new SaslServerCallbackHandler. * Creates a new SaslServerCallbackHandler.
* *
* @param passwordFunction for determing the user's password * @param passwordFunction for determing the user's password
*/ */
public SaslServerCallbackHandler(PasswordFunction passwordFunction) { SaslServerCallbackHandler(Configuration conf, PasswordFunction passwordFunction) {
this.passwordFunction = passwordFunction; this.passwordFunction = passwordFunction;
final Class<? extends CustomizedCallbackHandler> clazz = conf.getClass(
HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
CustomizedCallbackHandler.DefaultHandler.class, CustomizedCallbackHandler.class);
try {
this.customizedCallbackHandler = clazz.newInstance();
} catch (Exception e) {
throw new IllegalStateException("Failed to create a new instance of " + clazz, e);
}
} }
@Override @Override
@ -230,6 +241,7 @@ public void handle(Callback[] callbacks) throws IOException,
NameCallback nc = null; NameCallback nc = null;
PasswordCallback pc = null; PasswordCallback pc = null;
AuthorizeCallback ac = null; AuthorizeCallback ac = null;
List<Callback> unknownCallbacks = null;
for (Callback callback : callbacks) { for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) { if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback; ac = (AuthorizeCallback) callback;
@ -240,8 +252,10 @@ public void handle(Callback[] callbacks) throws IOException,
} else if (callback instanceof RealmCallback) { } else if (callback instanceof RealmCallback) {
continue; // realm is ignored continue; // realm is ignored
} else { } else {
throw new UnsupportedCallbackException(callback, if (unknownCallbacks == null) {
"Unrecognized SASL Callback: " + callback); unknownCallbacks = new ArrayList<>();
}
unknownCallbacks.add(callback);
} }
} }
@ -253,6 +267,12 @@ public void handle(Callback[] callbacks) throws IOException,
ac.setAuthorized(true); ac.setAuthorized(true);
ac.setAuthorizedID(ac.getAuthorizationID()); ac.setAuthorizedID(ac.getAuthorizationID());
} }
if (unknownCallbacks != null) {
final String name = nc != null ? nc.getDefaultName() : null;
final char[] password = name != null ? passwordFunction.apply(name) : null;
customizedCallbackHandler.handleCallback(unknownCallbacks, name, password);
}
} }
} }
@ -298,7 +318,7 @@ private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
Map<String, String> saslProps = saslPropsResolver.getServerProperties( Map<String, String> saslProps = saslPropsResolver.getServerProperties(
getPeerAddress(peer)); getPeerAddress(peer));
CallbackHandler callbackHandler = new SaslServerCallbackHandler( final CallbackHandler callbackHandler = new SaslServerCallbackHandler(dnConf.getConf(),
new PasswordFunction() { new PasswordFunction() {
@Override @Override
public char[] apply(String userName) throws IOException { public char[] apply(String userName) throws IOException {

View File

@ -2641,6 +2641,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.data.transfer.sasl.CustomizedCallbackHandler.class</name>
<value></value>
<description>
Some security provider may define a new javax.security.auth.callback.Callback.
This property allows users to configure a customized callback handler.
</description>
</property>
<property> <property>
<name>dfs.journalnode.rpc-address</name> <name>dfs.journalnode.rpc-address</name>
<value>0.0.0.0:8485</value> <value>0.0.0.0:8485</value>

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer.SaslServerCallbackHandler;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import java.util.Arrays;
import java.util.List;
public class TestCustomizedCallbackHandler {
public static final Logger LOG = LoggerFactory.getLogger(TestCustomizedCallbackHandler.class);
static class MyCallback implements Callback { }
static class MyCallbackHandler implements CustomizedCallbackHandler {
@Override
public void handleCallback(List<Callback> callbacks, String name, char[] password) {
LOG.info("{}: handling {} for {}", getClass().getSimpleName(), callbacks, name);
}
}
@Test
public void testCustomizedCallbackHandler() throws Exception {
final Configuration conf = new Configuration();
final Callback[] callbacks = {new MyCallback()};
// without setting conf, expect UnsupportedCallbackException
try {
new SaslServerCallbackHandler(conf, String::toCharArray).handle(callbacks);
Assert.fail("Expected UnsupportedCallbackException for " + Arrays.asList(callbacks));
} catch (UnsupportedCallbackException e) {
LOG.info("The failure is expected", e);
}
// set conf and expect success
conf.setClass(HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_CUSTOMIZEDCALLBACKHANDLER_CLASS_KEY,
MyCallbackHandler.class, CustomizedCallbackHandler.class);
new SaslServerCallbackHandler(conf, String::toCharArray).handle(callbacks);
}
}