YARN-7537. Add ability to load hbase config from distributed file system.
Contributed by Prabhu Joseph
This commit is contained in:
parent
1a78794227
commit
d45669cd3c
@ -74,6 +74,19 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
|
@ -17,11 +17,13 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Query;
|
||||
@ -40,7 +42,6 @@ public final class HBaseTimelineStorageUtils {
|
||||
private HBaseTimelineStorageUtils() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param conf YARN configuration. Used to see if there is an explicit config
|
||||
* pointing to the HBase config file to read. It should not be null
|
||||
@ -48,28 +49,33 @@ private HBaseTimelineStorageUtils() {
|
||||
* @return a configuration with the HBase configuration from the classpath,
|
||||
* optionally overwritten by the timeline service configuration URL if
|
||||
* specified.
|
||||
* @throws MalformedURLException if a timeline service HBase configuration URL
|
||||
* is specified but is a malformed URL.
|
||||
* @throws IOException if a timeline service HBase configuration URL
|
||||
* is specified but unable to read it.
|
||||
*/
|
||||
public static Configuration getTimelineServiceHBaseConf(Configuration conf)
|
||||
throws MalformedURLException {
|
||||
throws IOException {
|
||||
if (conf == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
|
||||
Configuration hbaseConf;
|
||||
String timelineServiceHBaseConfFileURL =
|
||||
String timelineServiceHBaseConfFilePath =
|
||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
|
||||
if (timelineServiceHBaseConfFileURL != null
|
||||
&& timelineServiceHBaseConfFileURL.length() > 0) {
|
||||
|
||||
if (timelineServiceHBaseConfFilePath != null
|
||||
&& timelineServiceHBaseConfFilePath.length() > 0) {
|
||||
LOG.info("Using hbase configuration at " +
|
||||
timelineServiceHBaseConfFileURL);
|
||||
timelineServiceHBaseConfFilePath);
|
||||
// create a clone so that we don't mess with out input one
|
||||
hbaseConf = new Configuration(conf);
|
||||
Configuration plainHBaseConf = new Configuration(false);
|
||||
URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
|
||||
plainHBaseConf.addResource(hbaseSiteXML);
|
||||
Path hbaseConfigPath = new Path(timelineServiceHBaseConfFilePath);
|
||||
try (FileSystem fs =
|
||||
FileSystem.newInstance(hbaseConfigPath.toUri(), conf);
|
||||
FSDataInputStream in = fs.open(hbaseConfigPath)) {
|
||||
plainHBaseConf.addResource(in);
|
||||
HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
|
||||
}
|
||||
} else {
|
||||
// default to what is on the classpath
|
||||
hbaseConf = HBaseConfiguration.create(conf);
|
||||
|
@ -18,16 +18,90 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* Unit tests for HBaseTimelineStorageUtils static methos.
|
||||
*/
|
||||
public class TestHBaseTimelineStorageUtils {
|
||||
|
||||
private String hbaseConfigPath = "target/hbase-site.xml";
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
// Input Hbase Configuration
|
||||
Configuration hbaseConf = new Configuration();
|
||||
hbaseConf.set("input", "test");
|
||||
|
||||
//write the document to a buffer (not directly to the file, as that
|
||||
//can cause the file being written to get read which will then fail.
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
hbaseConf.writeXml(bytesOut);
|
||||
bytesOut.close();
|
||||
|
||||
//write the bytes to the file
|
||||
File file = new File(hbaseConfigPath);
|
||||
OutputStream os = new FileOutputStream(file);
|
||||
os.write(bytesOut.toByteArray());
|
||||
os.close();
|
||||
}
|
||||
|
||||
@Test(expected=NullPointerException.class)
|
||||
public void testGetTimelineServiceHBaseConfNullArgument() throws Exception {
|
||||
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithHbaseConfAtLocalFileSystem() throws IOException {
|
||||
// Verifying With Hbase Conf from Local FileSystem
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE,
|
||||
hbaseConfigPath);
|
||||
Configuration hbaseConfFromLocal =
|
||||
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||
Assert.assertEquals("Failed to read hbase config from Local FileSystem",
|
||||
"test", hbaseConfFromLocal.get("input"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithHbaseConfAtHdfsFileSystem() throws IOException {
|
||||
MiniDFSCluster hdfsCluster = null;
|
||||
try {
|
||||
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
|
||||
.numDataNodes(1).build();
|
||||
|
||||
FileSystem fs = hdfsCluster.getFileSystem();
|
||||
Path path = new Path("/tmp/hdfs-site.xml");
|
||||
fs.copyFromLocalFile(new Path(hbaseConfigPath), path);
|
||||
|
||||
// Verifying With Hbase Conf from HDFS FileSystem
|
||||
Configuration conf = new Configuration(hdfsConfig);
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE,
|
||||
path.toString());
|
||||
Configuration hbaseConfFromHdfs =
|
||||
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||
Assert.assertEquals("Failed to read hbase config from Hdfs FileSystem",
|
||||
"test", hbaseConfFromHdfs.get("input"));
|
||||
} finally {
|
||||
if (hdfsCluster != null) {
|
||||
hdfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user