HDFS-16738. Invalid CallerContext caused NullPointerException (#4791)
This commit is contained in:
parent
880686d1e3
commit
1691cccc89
@ -541,18 +541,21 @@ public static String parseSpecialValue(String content, String key) {
|
|||||||
* @return The actual client's machine.
|
* @return The actual client's machine.
|
||||||
*/
|
*/
|
||||||
public static String getClientMachine(final String[] ipProxyUsers) {
|
public static String getClientMachine(final String[] ipProxyUsers) {
|
||||||
|
String clientMachine = null;
|
||||||
String cc = clientInfoFromContext(ipProxyUsers);
|
String cc = clientInfoFromContext(ipProxyUsers);
|
||||||
if (cc != null) {
|
if (cc != null) {
|
||||||
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
|
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
|
||||||
// return "1.2.3.4" as the client machine.
|
// return "1.2.3.4" as the client machine.
|
||||||
String key = CallerContext.CLIENT_IP_STR +
|
String key = CallerContext.CLIENT_IP_STR +
|
||||||
CallerContext.Builder.KEY_VALUE_SEPARATOR;
|
CallerContext.Builder.KEY_VALUE_SEPARATOR;
|
||||||
return parseSpecialValue(cc, key);
|
clientMachine = parseSpecialValue(cc, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
String clientMachine = Server.getRemoteAddress();
|
if (clientMachine == null) {
|
||||||
if (clientMachine == null) { //not a RPC client
|
clientMachine = Server.getRemoteAddress();
|
||||||
clientMachine = "";
|
if (clientMachine == null) { //not a RPC client
|
||||||
|
clientMachine = "";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return clientMachine;
|
return clientMachine;
|
||||||
}
|
}
|
||||||
|
@ -28,22 +28,29 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.ipc.CallerContext;
|
import org.apache.hadoop.ipc.CallerContext;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
public class TestNameNodeRpcServer {
|
public class TestNameNodeRpcServer {
|
||||||
|
|
||||||
@ -91,6 +98,66 @@ private static String getPreferredLocation(DistributedFileSystem fs,
|
|||||||
// trials. 1/3^20=3e-10, so that should be good enough.
|
// trials. 1/3^20=3e-10, so that should be good enough.
|
||||||
static final int ITERATIONS_TO_USE = 20;
|
static final int ITERATIONS_TO_USE = 20;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Timeout(30000)
|
||||||
|
public void testNamenodeRpcClientIpProxyWithFailBack() throws Exception {
|
||||||
|
// Make 3 nodes & racks so that we have a decent shot of detecting when
|
||||||
|
// our change overrides the random choice of datanode.
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
|
||||||
|
final CallerContext original = CallerContext.getCurrent();
|
||||||
|
|
||||||
|
MiniQJMHACluster qjmhaCluster = null;
|
||||||
|
try {
|
||||||
|
String baseDir = GenericTestUtils.getRandomizedTempPath();
|
||||||
|
MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
|
||||||
|
builder.getDfsBuilder().numDataNodes(3);
|
||||||
|
qjmhaCluster = builder.baseDir(baseDir).build();
|
||||||
|
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
|
||||||
|
dfsCluster.waitActive();
|
||||||
|
dfsCluster.transitionToActive(0);
|
||||||
|
|
||||||
|
// Set the caller context to set the ip address
|
||||||
|
CallerContext.setCurrent(
|
||||||
|
new CallerContext.Builder("test", conf)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
dfsCluster.getFileSystem(0).setPermission(
|
||||||
|
new Path("/"), FsPermission.getDirDefault());
|
||||||
|
|
||||||
|
// Run as fake joe to authorize the test
|
||||||
|
UserGroupInformation joe =
|
||||||
|
UserGroupInformation.createUserForTesting("fake_joe",
|
||||||
|
new String[]{"fake_group"});
|
||||||
|
|
||||||
|
FileSystem joeFs = joe.doAs((PrivilegedExceptionAction<FileSystem>) () ->
|
||||||
|
FileSystem.get(dfsCluster.getURI(0), conf));
|
||||||
|
|
||||||
|
Path testPath = new Path("/foo");
|
||||||
|
// Write a sample file
|
||||||
|
FSDataOutputStream stream = joeFs.create(testPath);
|
||||||
|
stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
|
||||||
|
stream.close();
|
||||||
|
|
||||||
|
qjmhaCluster.getDfsCluster().transitionToStandby(0);
|
||||||
|
qjmhaCluster.getDfsCluster().transitionToActive(1);
|
||||||
|
|
||||||
|
DistributedFileSystem nn1 = dfsCluster.getFileSystem(1);
|
||||||
|
assertNotNull(nn1.getFileStatus(testPath));
|
||||||
|
} finally {
|
||||||
|
CallerContext.setCurrent(original);
|
||||||
|
if (qjmhaCluster != null) {
|
||||||
|
try {
|
||||||
|
qjmhaCluster.shutdown();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Reset the config
|
||||||
|
conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A test to make sure that if an authorized user adds "clientIp:" to their
|
* A test to make sure that if an authorized user adds "clientIp:" to their
|
||||||
* caller context, it will be used to make locality decisions on the NN.
|
* caller context, it will be used to make locality decisions on the NN.
|
||||||
|
Loading…
Reference in New Issue
Block a user