HADOOP-11664. Loading predefined EC schemas from configuration. Contributed by Kai Zheng.

This commit is contained in:
Zhe Zhang 2015-03-27 14:52:50 -07:00 committed by Zhe Zhang
parent 9d1175b8fb
commit d9af36b9bd
4 changed files with 272 additions and 0 deletions

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Please define your EC schemas here. Note, once these schemas are loaded
and referenced by EC storage policies, any change to them will be ignored.
You can modify and remove those not used yet, or add new ones.
-->
<schemas>
<schema name="RS-6-3">
<k>6</k>
<m>3</m>
<codec>RS</codec>
</schema>
<schema name="RS-10-4">
<k>10</k>
<m>4</m>
<codec>RS</codec>
</schema>
</schemas>

View File

@ -143,6 +143,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Supported erasure codec classes */ /** Supported erasure codec classes */
public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs"; public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs";
public static final String IO_ERASURECODE_SCHEMA_FILE_KEY =
"io.erasurecode.schema.file";
public static final String IO_ERASURECODE_SCHEMA_FILE_DEFAULT =
"ecschema-def.xml";
/** Use XOR raw coder when possible for the RS codec */ /** Use XOR raw coder when possible for the RS codec */
public static final String IO_ERASURECODE_CODEC_RS_USEXOR_KEY = public static final String IO_ERASURECODE_CODEC_RS_USEXOR_KEY =
"io.erasurecode.codec.rs.usexor"; "io.erasurecode.codec.rs.usexor";

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.w3c.dom.*;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.*;
/**
* A EC schema loading utility that loads predefined EC schemas from XML file
*/
public class SchemaLoader {
private static final Log LOG = LogFactory.getLog(SchemaLoader.class.getName());
/**
* Load predefined ec schemas from configuration file. This file is
* expected to be in the XML format.
*/
public List<ECSchema> loadSchema(Configuration conf) {
File confFile = getSchemaFile(conf);
if (confFile == null) {
LOG.warn("Not found any predefined EC schema file");
return Collections.emptyList();
}
try {
return loadSchema(confFile);
} catch (ParserConfigurationException e) {
throw new RuntimeException("Failed to load schema file: " + confFile);
} catch (IOException e) {
throw new RuntimeException("Failed to load schema file: " + confFile);
} catch (SAXException e) {
throw new RuntimeException("Failed to load schema file: " + confFile);
}
}
private List<ECSchema> loadSchema(File schemaFile)
throws ParserConfigurationException, IOException, SAXException {
LOG.info("Loading predefined EC schema file " + schemaFile);
// Read and parse the schema file.
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setIgnoringComments(true);
DocumentBuilder builder = dbf.newDocumentBuilder();
Document doc = builder.parse(schemaFile);
Element root = doc.getDocumentElement();
if (!"schemas".equals(root.getTagName())) {
throw new RuntimeException("Bad EC schema config file: " +
"top-level element not <schemas>");
}
NodeList elements = root.getChildNodes();
List<ECSchema> schemas = new ArrayList<ECSchema>();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element) node;
if ("schema".equals(element.getTagName())) {
ECSchema schema = loadSchema(element);
schemas.add(schema);
} else {
LOG.warn("Bad element in EC schema configuration file: " +
element.getTagName());
}
}
}
return schemas;
}
/**
* Path to the XML file containing predefined ec schemas. If the path is
* relative, it is searched for in the classpath.
*/
private File getSchemaFile(Configuration conf) {
String schemaFilePath = conf.get(
CommonConfigurationKeys.IO_ERASURECODE_SCHEMA_FILE_KEY,
CommonConfigurationKeys.IO_ERASURECODE_SCHEMA_FILE_DEFAULT);
File schemaFile = new File(schemaFilePath);
if (! schemaFile.isAbsolute()) {
URL url = Thread.currentThread().getContextClassLoader()
.getResource(schemaFilePath);
if (url == null) {
LOG.warn(schemaFilePath + " not found on the classpath.");
schemaFile = null;
} else if (! url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException(
"EC predefined schema file " + url +
" found on the classpath is not on the local filesystem.");
} else {
schemaFile = new File(url.getPath());
}
}
return schemaFile;
}
/**
* Loads a schema from a schema element in the configuration file
*/
private ECSchema loadSchema(Element element) {
String schemaName = element.getAttribute("name");
Map<String, String> ecOptions = new HashMap<String, String>();
NodeList fields = element.getChildNodes();
for (int i = 0; i < fields.getLength(); i++) {
Node fieldNode = fields.item(i);
if (fieldNode instanceof Element) {
Element field = (Element) fieldNode;
String tagName = field.getTagName();
String value = ((Text) field.getFirstChild()).getData().trim();
ecOptions.put(tagName, value);
}
}
ECSchema schema = new ECSchema(schemaName, ecOptions);
return schema;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestSchemaLoader {
final static String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
final static String SCHEMA_FILE = new File(TEST_DIR, "test-ecschema")
.getAbsolutePath();
@Test
public void testLoadSchema() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(SCHEMA_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<schemas>");
out.println(" <schema name=\"RSk6m3\">");
out.println(" <k>6</k>");
out.println(" <m>3</m>");
out.println(" <codec>RS</codec>");
out.println(" </schema>");
out.println(" <schema name=\"RSk10m4\">");
out.println(" <k>10</k>");
out.println(" <m>4</m>");
out.println(" <codec>RS</codec>");
out.println(" </schema>");
out.println("</schemas>");
out.close();
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_ERASURECODE_SCHEMA_FILE_KEY,
SCHEMA_FILE);
SchemaLoader schemaLoader = new SchemaLoader();
List<ECSchema> schemas = schemaLoader.loadSchema(conf);
assertEquals(2, schemas.size());
ECSchema schema1 = schemas.get(0);
assertEquals("RSk6m3", schema1.getSchemaName());
assertEquals(3, schema1.getOptions().size());
assertEquals(6, schema1.getNumDataUnits());
assertEquals(3, schema1.getNumParityUnits());
assertEquals("RS", schema1.getCodecName());
ECSchema schema2 = schemas.get(1);
assertEquals("RSk10m4", schema2.getSchemaName());
assertEquals(3, schema2.getOptions().size());
assertEquals(10, schema2.getNumDataUnits());
assertEquals(4, schema2.getNumParityUnits());
assertEquals("RS", schema2.getCodecName());
}
}