From 6be39230a67098e7d157925575b3b18bbf947717 Mon Sep 17 00:00:00 2001 From: Vrushali C Date: Tue, 15 Jan 2019 21:25:37 -0800 Subject: [PATCH 01/13] YARN-9150 Making TimelineSchemaCreator support different backends for Timeline Schema Creation in ATSv2. Contributed by Sushil Ks --- .../hadoop/yarn/conf/YarnConfiguration.java | 7 + .../storage/DataGeneratorForTest.java | 2 +- .../storage/TimelineSchemaCreator.java | 378 ------------------ 3 files changed, 8 insertions(+), 379 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c29707c82e..e1980c3a7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2697,6 +2697,13 @@ public static boolean isAclEnabled(Configuration conf) { "org.apache.hadoop.yarn.server.timelineservice.storage" + ".HBaseTimelineReaderImpl"; + public static final String TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS = + TIMELINE_SERVICE_PREFIX + "schema-creator.class"; + + public static final String DEFAULT_TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS = + "org.apache.hadoop.yarn.server.timelineservice.storage" + + ".HBaseTimelineSchemaCreator"; + /** * default schema prefix for hbase tables. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java index cf6a854424..476021a597 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java @@ -57,7 +57,7 @@ public static void createSchema(final Configuration conf) // the coprocessor class is loaded from classpath conf.set(YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION, " "); // now create all tables - TimelineSchemaCreator.createAllTables(conf, false); + HBaseTimelineSchemaCreator.createAllTables(conf, false); } public static void loadApps(HBaseTestingUtility util, long ts) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java deleted file mode 100644 index af6f915132..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; -import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This creates the schema for a hbase based backend for storing application - * timeline information. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public final class TimelineSchemaCreator { - private TimelineSchemaCreator() { - } - - final static String NAME = TimelineSchemaCreator.class.getSimpleName(); - private static final Logger LOG = - LoggerFactory.getLogger(TimelineSchemaCreator.class); - private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; - private static final String APP_METRICS_TTL_OPTION_SHORT = "ma"; - private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa"; - private static final String APP_TABLE_NAME_SHORT = "a"; - private static final String SUB_APP_TABLE_NAME_SHORT = "sa"; - private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; - private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me"; - private static final String ENTITY_TABLE_NAME_SHORT = "e"; - private static final String HELP_SHORT = "h"; - private static final String CREATE_TABLES_SHORT = "c"; - - public static void main(String[] args) throws Exception { - - LOG.info("Starting the schema creation"); - Configuration hbaseConf = - HBaseTimelineStorageUtils.getTimelineServiceHBaseConf( - new YarnConfiguration()); - // Grab input args and allow for -Dxyz style arguments - String[] otherArgs = new GenericOptionsParser(hbaseConf, args) - .getRemainingArgs(); - - // Grab the arguments we're looking for. - CommandLine commandLine = parseArgs(otherArgs); - - if (commandLine.hasOption(HELP_SHORT)) { - // -help option has the highest precedence - printUsage(); - } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) { - // Grab the entityTableName argument - String entityTableName = commandLine.getOptionValue( - ENTITY_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(entityTableName)) { - hbaseConf.set(EntityTableRW.TABLE_NAME_CONF_NAME, entityTableName); - } - // Grab the entity metrics TTL - String entityTableMetricsTTL = commandLine.getOptionValue( - ENTITY_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(entityTableMetricsTTL)) { - int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL); - new EntityTableRW().setMetricsTTL(entityMetricsTTL, hbaseConf); - } - // Grab the appToflowTableName argument - String appToflowTableName = commandLine.getOptionValue( - APP_TO_FLOW_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(appToflowTableName)) { - hbaseConf.set( - AppToFlowTableRW.TABLE_NAME_CONF_NAME, appToflowTableName); - } - // Grab the applicationTableName argument - String applicationTableName = commandLine.getOptionValue( - APP_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(applicationTableName)) { - hbaseConf.set(ApplicationTableRW.TABLE_NAME_CONF_NAME, - applicationTableName); - } - // Grab the application metrics TTL - String applicationTableMetricsTTL = commandLine.getOptionValue( - APP_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(applicationTableMetricsTTL)) { - int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL); - new ApplicationTableRW().setMetricsTTL(appMetricsTTL, hbaseConf); - } - - // Grab the subApplicationTableName argument - String subApplicationTableName = commandLine.getOptionValue( - SUB_APP_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(subApplicationTableName)) { - hbaseConf.set(SubApplicationTableRW.TABLE_NAME_CONF_NAME, - subApplicationTableName); - } - // Grab the subApplication metrics TTL - String subApplicationTableMetricsTTL = commandLine - .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) { - int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL); - new SubApplicationTableRW().setMetricsTTL(subAppMetricsTTL, hbaseConf); - } - - // create all table schemas in hbase - final boolean skipExisting = commandLine.hasOption( - SKIP_EXISTING_TABLE_OPTION_SHORT); - createAllSchemas(hbaseConf, skipExisting); - } else { - // print usage information if -create is not specified - printUsage(); - } - } - - /** - * Parse command-line arguments. - * - * @param args - * command line arguments passed to program. - * @return parsed command line. - * @throws ParseException - */ - private static CommandLine parseArgs(String[] args) throws ParseException { - Options options = new Options(); - - // Input - Option o = new Option(HELP_SHORT, "help", false, "print help information"); - o.setRequired(false); - options.addOption(o); - - o = new Option(CREATE_TABLES_SHORT, "create", false, - "a mandatory option to create hbase tables"); - o.setRequired(false); - options.addOption(o); - - o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, - "entity table name"); - o.setArgName("entityTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true, - "TTL for metrics column family"); - o.setArgName("entityMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, - "app to flow table name"); - o.setArgName("appToflowTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, - "application table name"); - o.setArgName("applicationTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true, - "TTL for metrics column family"); - o.setArgName("applicationMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true, - "subApplication table name"); - o.setArgName("subApplicationTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL", - true, "TTL for metrics column family"); - o.setArgName("subApplicationMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - // Options without an argument - // No need to set arg name since we do not need an argument here - o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", - false, "skip existing Hbase tables and continue to create new tables"); - o.setRequired(false); - options.addOption(o); - - CommandLineParser parser = new PosixParser(); - CommandLine commandLine = null; - try { - commandLine = parser.parse(options, args); - } catch (Exception e) { - LOG.error("ERROR: " + e.getMessage() + "\n"); - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(NAME + " ", options, true); - System.exit(-1); - } - - return commandLine; - } - - private static void printUsage() { - StringBuilder usage = new StringBuilder("Command Usage: \n"); - usage.append("TimelineSchemaCreator [-help] Display help info" + - " for all commands. Or\n"); - usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" + - " Create hbase tables.\n\n"); - usage.append("The Optional options for creating tables include: \n"); - usage.append("[-entityTableName ] " + - "The name of the Entity table\n"); - usage.append("[-entityMetricsTTL ]" + - " TTL for metrics in the Entity table\n"); - usage.append("[-appToflowTableName ]" + - " The name of the AppToFlow table\n"); - usage.append("[-applicationTableName ]" + - " The name of the Application table\n"); - usage.append("[-applicationMetricsTTL ]" + - " TTL for metrics in the Application table\n"); - usage.append("[-subApplicationTableName ]" + - " The name of the SubApplication table\n"); - usage.append("[-subApplicationMetricsTTL " + - " ]" + - " TTL for metrics in the SubApplication table\n"); - usage.append("[-skipExistingTable] Whether to skip existing" + - " hbase tables\n"); - System.out.println(usage.toString()); - } - - /** - * Create all table schemas and log success or exception if failed. - * @param hbaseConf the hbase configuration to create tables with - * @param skipExisting whether to skip existing hbase tables - */ - private static void createAllSchemas(Configuration hbaseConf, - boolean skipExisting) { - List exceptions = new ArrayList<>(); - try { - if (skipExisting) { - LOG.info("Will skip existing tables and continue on htable creation " - + "exceptions!"); - } - createAllTables(hbaseConf, skipExisting); - LOG.info("Successfully created HBase schema. "); - } catch (IOException e) { - LOG.error("Error in creating hbase tables: ", e); - exceptions.add(e); - } - - if (exceptions.size() > 0) { - LOG.warn("Schema creation finished with the following exceptions"); - for (Exception e : exceptions) { - LOG.warn(e.getMessage()); - } - System.exit(-1); - } else { - LOG.info("Schema creation finished successfully"); - } - } - - @VisibleForTesting - public static void createAllTables(Configuration hbaseConf, - boolean skipExisting) throws IOException { - - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(hbaseConf); - Admin admin = conn.getAdmin(); - if (admin == null) { - throw new IOException("Cannot create table since admin is null"); - } - try { - new EntityTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new AppToFlowTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new ApplicationTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowRunTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowActivityTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new SubApplicationTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new DomainTableRW().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - } finally { - if (conn != null) { - conn.close(); - } - } - } - - -} From 713ded6b15dc0b5e4205a7812a62225377e0b32b Mon Sep 17 00:00:00 2001 From: Vrushali C Date: Tue, 15 Jan 2019 21:28:10 -0800 Subject: [PATCH 02/13] YARN-9150 Making TimelineSchemaCreator support different backends for Timeline Schema Creation in ATSv2. Contributed by Sushil Ks --- .../storage/HBaseTimelineSchemaCreator.java | 378 ++++++++++++++++++ .../storage/SchemaCreator.java | 28 ++ .../storage/TimelineSchemaCreator.java | 80 ++++ .../storage/DummyTimelineSchemaCreator.java | 29 ++ .../storage/TestTimelineSchemaCreator.java | 41 ++ 5 files changed, 556 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineSchemaCreator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/SchemaCreator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DummyTimelineSchemaCreator.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineSchemaCreator.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineSchemaCreator.java new file mode 100644 index 0000000000..b1593c5101 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineSchemaCreator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class HBaseTimelineSchemaCreator implements SchemaCreator { + public HBaseTimelineSchemaCreator() { + } + + final static String NAME = HBaseTimelineSchemaCreator.class.getSimpleName(); + private static final Logger LOG = + LoggerFactory.getLogger(HBaseTimelineSchemaCreator.class); + private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; + private static final String APP_METRICS_TTL_OPTION_SHORT = "ma"; + private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa"; + private static final String APP_TABLE_NAME_SHORT = "a"; + private static final String SUB_APP_TABLE_NAME_SHORT = "sa"; + private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; + private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me"; + private static final String ENTITY_TABLE_NAME_SHORT = "e"; + private static final String HELP_SHORT = "h"; + private static final String CREATE_TABLES_SHORT = "c"; + + public void createTimelineSchema(String[] args) throws Exception { + + LOG.info("Starting the schema creation"); + Configuration hbaseConf = + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf( + new YarnConfiguration()); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + if (commandLine.hasOption(HELP_SHORT)) { + // -help option has the highest precedence + printUsage(); + } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) { + // Grab the entityTableName argument + String entityTableName = commandLine.getOptionValue( + ENTITY_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(EntityTableRW.TABLE_NAME_CONF_NAME, entityTableName); + } + // Grab the entity metrics TTL + String entityTableMetricsTTL = commandLine.getOptionValue( + ENTITY_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(entityTableMetricsTTL)) { + int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL); + new EntityTableRW().setMetricsTTL(entityMetricsTTL, hbaseConf); + } + // Grab the appToflowTableName argument + String appToflowTableName = commandLine.getOptionValue( + APP_TO_FLOW_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(appToflowTableName)) { + hbaseConf.set( + AppToFlowTableRW.TABLE_NAME_CONF_NAME, appToflowTableName); + } + // Grab the applicationTableName argument + String applicationTableName = commandLine.getOptionValue( + APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(applicationTableName)) { + hbaseConf.set(ApplicationTableRW.TABLE_NAME_CONF_NAME, + applicationTableName); + } + // Grab the application metrics TTL + String applicationTableMetricsTTL = commandLine.getOptionValue( + APP_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(applicationTableMetricsTTL)) { + int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL); + new ApplicationTableRW().setMetricsTTL(appMetricsTTL, hbaseConf); + } + + // Grab the subApplicationTableName argument + String subApplicationTableName = commandLine.getOptionValue( + SUB_APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(subApplicationTableName)) { + hbaseConf.set(SubApplicationTableRW.TABLE_NAME_CONF_NAME, + subApplicationTableName); + } + // Grab the subApplication metrics TTL + String subApplicationTableMetricsTTL = commandLine + .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) { + int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL); + new SubApplicationTableRW().setMetricsTTL(subAppMetricsTTL, hbaseConf); + } + + // create all table schemas in hbase + final boolean skipExisting = commandLine.hasOption( + SKIP_EXISTING_TABLE_OPTION_SHORT); + createAllSchemas(hbaseConf, skipExisting); + } else { + // print usage information if -create is not specified + printUsage(); + } + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option(HELP_SHORT, "help", false, "print help information"); + o.setRequired(false); + options.addOption(o); + + o = new Option(CREATE_TABLES_SHORT, "create", false, + "a mandatory option to create hbase tables"); + o.setRequired(false); + options.addOption(o); + + o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, + "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true, + "TTL for metrics column family"); + o.setArgName("entityMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, + "app to flow table name"); + o.setArgName("appToflowTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, + "application table name"); + o.setArgName("applicationTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true, + "TTL for metrics column family"); + o.setArgName("applicationMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true, + "subApplication table name"); + o.setArgName("subApplicationTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL", + true, "TTL for metrics column family"); + o.setArgName("subApplicationMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + // Options without an argument + // No need to set arg name since we do not need an argument here + o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", + false, "skip existing Hbase tables and continue to create new tables"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + private static void printUsage() { + StringBuilder usage = new StringBuilder("Command Usage: \n"); + usage.append("TimelineSchemaCreator [-help] Display help info" + + " for all commands. Or\n"); + usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" + + " Create hbase tables.\n\n"); + usage.append("The Optional options for creating tables include: \n"); + usage.append("[-entityTableName ] " + + "The name of the Entity table\n"); + usage.append("[-entityMetricsTTL ]" + + " TTL for metrics in the Entity table\n"); + usage.append("[-appToflowTableName ]" + + " The name of the AppToFlow table\n"); + usage.append("[-applicationTableName ]" + + " The name of the Application table\n"); + usage.append("[-applicationMetricsTTL ]" + + " TTL for metrics in the Application table\n"); + usage.append("[-subApplicationTableName ]" + + " The name of the SubApplication table\n"); + usage.append("[-subApplicationMetricsTTL " + + " ]" + + " TTL for metrics in the SubApplication table\n"); + usage.append("[-skipExistingTable] Whether to skip existing" + + " hbase tables\n"); + System.out.println(usage.toString()); + } + + /** + * Create all table schemas and log success or exception if failed. + * @param hbaseConf the hbase configuration to create tables with + * @param skipExisting whether to skip existing hbase tables + */ + private static void createAllSchemas(Configuration hbaseConf, + boolean skipExisting) { + List exceptions = new ArrayList<>(); + try { + if (skipExisting) { + LOG.info("Will skip existing tables and continue on htable creation " + + "exceptions!"); + } + createAllTables(hbaseConf, skipExisting); + LOG.info("Successfully created HBase schema. "); + } catch (IOException e) { + LOG.error("Error in creating hbase tables: ", e); + exceptions.add(e); + } + + if (exceptions.size() > 0) { + LOG.warn("Schema creation finished with the following exceptions"); + for (Exception e : exceptions) { + LOG.warn(e.getMessage()); + } + System.exit(-1); + } else { + LOG.info("Schema creation finished successfully"); + } + } + + @VisibleForTesting + public static void createAllTables(Configuration hbaseConf, + boolean skipExisting) throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + try { + new EntityTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new AppToFlowTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new ApplicationTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowRunTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowActivityTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new SubApplicationTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new DomainTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + } finally { + if (conn != null) { + conn.close(); + } + } + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/SchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/SchemaCreator.java new file mode 100644 index 0000000000..d04f85d253 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/SchemaCreator.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +/** + * This interface is for creating Timeline Schema. The backend for Timeline + * Service have to implement this. + */ +public interface SchemaCreator { + + void createTimelineSchema(String[] args) throws Exception; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000000..20ae8bfe0f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This creates the timeline schema for storing application timeline + * information. Each backend has to implement the {@link SchemaCreator} for + * creating the schema in its backend and should be configured in yarn-site.xml. + */ +public class TimelineSchemaCreator extends Configured implements Tool { + private static final Logger LOG = + LoggerFactory.getLogger(TimelineSchemaCreator.class); + + public static void main(String[] args) { + try { + int status = ToolRunner.run(new YarnConfiguration(), + new TimelineSchemaCreator(), args); + System.exit(status); + } catch (Exception e) { + LOG.error("Error while creating Timeline Schema : ", e); + } + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + return createTimelineSchema(args, conf); + } + + @VisibleForTesting + int createTimelineSchema(String[] args, Configuration conf) throws Exception { + String schemaCreatorClassName = conf.get( + YarnConfiguration.TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS); + LOG.info("Using {} for creating Timeline Service Schema ", + schemaCreatorClassName); + try { + Class schemaCreatorClass = Class.forName(schemaCreatorClassName); + if (SchemaCreator.class.isAssignableFrom(schemaCreatorClass)) { + SchemaCreator schemaCreator = (SchemaCreator) ReflectionUtils + .newInstance(schemaCreatorClass, conf); + schemaCreator.createTimelineSchema(args); + return 0; + } else { + throw new YarnRuntimeException("Class: " + schemaCreatorClassName + + " not instance of " + SchemaCreator.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate TimelineReader: " + + schemaCreatorClassName, e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DummyTimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DummyTimelineSchemaCreator.java new file mode 100644 index 0000000000..98304fc815 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DummyTimelineSchemaCreator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +/** + * Dummy Implementation of {@link SchemaCreator} for test. + */ +public class DummyTimelineSchemaCreator implements SchemaCreator { + + @Override + public void createTimelineSchema(String[] args) { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineSchemaCreator.java new file mode 100644 index 0000000000..16b6d995d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineSchemaCreator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link TimelineSchemaCreator}. + */ +public class TestTimelineSchemaCreator { + + @Test + public void testTimelineSchemaCreation() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage" + + ".DummyTimelineSchemaCreator"); + TimelineSchemaCreator timelineSchemaCreator = new TimelineSchemaCreator(); + Assert.assertEquals(0, timelineSchemaCreator + .createTimelineSchema(new String[]{}, conf)); + } +} \ No newline at end of file From 104ef5df36fd45f1c2d07a2f3d441263aa85e22e Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Wed, 16 Jan 2019 14:35:07 +0900 Subject: [PATCH 03/13] YARN-8747. [UI2] YARN UI2 page loading failed due to js error under some time zone configuration. Contributed by collinma. --- .../hadoop-yarn/hadoop-yarn-ui/src/main/webapp/bower.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/bower.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/bower.json index b7591a5b62..8fa96681d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/bower.json +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-ui/src/main/webapp/bower.json @@ -13,7 +13,7 @@ "qunit": "1.19.0", "jquery-ui": "1.11.4", "moment": "2.12.0", - "moment-timezone": "0.5.0", + "moment-timezone": "0.5.1", "more-js": "0.8.2", "bootstrap": "3.3.6", "d3": "~3.5.6", From f048512bb89f4d1edbb54360622adc61ffacbde3 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Wed, 16 Jan 2019 10:14:22 -0800 Subject: [PATCH 04/13] HDFS-14192. Track missing DFS operations in Statistics and StorageStatistics. Contributed by Ayush Saxena. --- .../hadoop/hdfs/DFSOpsCountStatistics.java | 14 ++ .../hadoop/hdfs/DistributedFileSystem.java | 28 ++++ .../hdfs/TestDistributedFileSystem.java | 121 ++++++++++++++++++ 3 files changed, 163 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java index b9852bad56..2113ae5c63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -41,12 +41,15 @@ public class DFSOpsCountStatistics extends StorageStatistics { /** This is for counting distributed file system operations. */ public enum OpType { + ADD_CACHE_DIRECTIVE("op_add_cache_directive"), + ADD_CACHE_POOL("op_add_cache_pool"), ADD_EC_POLICY("op_add_ec_policy"), ALLOW_SNAPSHOT("op_allow_snapshot"), APPEND(CommonStatisticNames.OP_APPEND), CONCAT("op_concat"), COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE), CREATE(CommonStatisticNames.OP_CREATE), + CREATE_ENCRYPTION_ZONE("op_create_encryption_zone"), CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE), CREATE_SNAPSHOT("op_create_snapshot"), CREATE_SYM_LINK("op_create_symlink"), @@ -61,6 +64,7 @@ public enum OpType { GET_EC_CODECS("op_get_ec_codecs"), GET_EC_POLICY("op_get_ec_policy"), GET_EC_POLICIES("op_get_ec_policies"), + GET_ENCRYPTION_ZONE("op_get_encryption_zone"), GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"), GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM), GET_FILE_LINK_STATUS("op_get_file_link_status"), @@ -72,8 +76,13 @@ public enum OpType { GET_STORAGE_POLICY("op_get_storage_policy"), GET_TRASH_ROOT("op_get_trash_root"), GET_XATTR("op_get_xattr"), + LIST_CACHE_DIRECTIVE("op_list_cache_directive"), + LIST_CACHE_POOL("op_list_cache_pool"), + LIST_ENCRYPTION_ZONE("op_list_encryption_zone"), LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS), LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS), + MODIFY_CACHE_POOL("op_modify_cache_pool"), + MODIFY_CACHE_DIRECTIVE("op_modify_cache_directive"), MKDIRS(CommonStatisticNames.OP_MKDIRS), MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES), OPEN(CommonStatisticNames.OP_OPEN), @@ -81,16 +90,21 @@ public enum OpType { PRIMITIVE_MKDIR("op_primitive_mkdir"), REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL), REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES), + REMOVE_CACHE_DIRECTIVE("op_remove_cache_directive"), + REMOVE_CACHE_POOL("op_remove_cache_pool"), REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL), REMOVE_EC_POLICY("op_remove_ec_policy"), REMOVE_XATTR("op_remove_xattr"), RENAME(CommonStatisticNames.OP_RENAME), RENAME_SNAPSHOT("op_rename_snapshot"), RESOLVE_LINK("op_resolve_link"), + SATISFY_STORAGE_POLICY("op_satisfy_storagepolicy"), SET_ACL(CommonStatisticNames.OP_SET_ACL), SET_EC_POLICY("op_set_ec_policy"), SET_OWNER(CommonStatisticNames.OP_SET_OWNER), SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION), + SET_QUOTA_BYTSTORAGEYPE("op_set_quota_bystoragetype"), + SET_QUOTA_USAGE("op_set_quota_usage"), SET_REPLICATION("op_set_replication"), SET_STORAGE_POLICY("op_set_storagePolicy"), SET_TIMES(CommonStatisticNames.OP_SET_TIMES), diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2b0b2c9f67..a1af465b24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1002,6 +1002,8 @@ public QuotaUsage next(final FileSystem fs, final Path p) */ public void setQuota(Path src, final long namespaceQuota, final long storagespaceQuota) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_QUOTA_USAGE); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -1030,6 +1032,8 @@ public Void next(final FileSystem fs, final Path p) public void setQuotaByStorageType(Path src, final StorageType type, final long quota) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_QUOTA_BYTSTORAGEYPE); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -2222,6 +2226,8 @@ public long addCacheDirective(CacheDirectiveInfo info) throws IOException { */ public long addCacheDirective( CacheDirectiveInfo info, EnumSet flags) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.ADD_CACHE_DIRECTIVE); Preconditions.checkNotNull(info.getPath()); Path path = new Path(getPathName(fixRelativePart(info.getPath()))). makeQualified(getUri(), getWorkingDirectory()); @@ -2249,6 +2255,8 @@ public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException { */ public void modifyCacheDirective( CacheDirectiveInfo info, EnumSet flags) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_DIRECTIVE); if (info.getPath() != null) { info = new CacheDirectiveInfo.Builder(info). setPath(new Path(getPathName(fixRelativePart(info.getPath()))). @@ -2265,6 +2273,8 @@ public void modifyCacheDirective( */ public void removeCacheDirective(long id) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_DIRECTIVE); dfs.removeCacheDirective(id); } @@ -2277,6 +2287,8 @@ public void removeCacheDirective(long id) */ public RemoteIterator listCacheDirectives( CacheDirectiveInfo filter) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_CACHE_DIRECTIVE); if (filter == null) { filter = new CacheDirectiveInfo.Builder().build(); } @@ -2317,6 +2329,8 @@ public CacheDirectiveEntry next() throws IOException { * If the request could not be completed. */ public void addCachePool(CachePoolInfo info) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.ADD_CACHE_POOL); CachePoolInfo.validate(info); dfs.addCachePool(info); } @@ -2330,6 +2344,8 @@ public void addCachePool(CachePoolInfo info) throws IOException { * If the request could not be completed. */ public void modifyCachePool(CachePoolInfo info) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_POOL); CachePoolInfo.validate(info); dfs.modifyCachePool(info); } @@ -2343,6 +2359,8 @@ public void modifyCachePool(CachePoolInfo info) throws IOException { * if the cache pool did not exist, or could not be removed. */ public void removeCachePool(String poolName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_POOL); CachePoolInfo.validateName(poolName); dfs.removeCachePool(poolName); } @@ -2356,6 +2374,8 @@ public void removeCachePool(String poolName) throws IOException { * If there was an error listing cache pools. */ public RemoteIterator listCachePools() throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_CACHE_POOL); return dfs.listCachePools(); } @@ -2497,6 +2517,8 @@ public AclStatus next(final FileSystem fs, final Path p) /* HDFS only */ public void createEncryptionZone(final Path path, final String keyName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_ENCRYPTION_ZONE); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2524,6 +2546,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { /* HDFS only */ public EncryptionZone getEZForPath(final Path path) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_ENCRYPTION_ZONE); Preconditions.checkNotNull(path); Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @@ -2551,6 +2575,8 @@ public EncryptionZone next(final FileSystem fs, final Path p) /* HDFS only */ public RemoteIterator listEncryptionZones() throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_ENCRYPTION_ZONE); return dfs.listEncryptionZones(); } @@ -2875,6 +2901,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { * @throws IOException */ public void satisfyStoragePolicy(final Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SATISFY_STORAGE_POLICY); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 97ced7a8c3..135cb4f358 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -37,6 +37,7 @@ import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.net.URI; +import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; @@ -46,13 +47,17 @@ import java.util.List; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; +import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -78,10 +83,13 @@ import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -775,6 +783,119 @@ public void testStatistics() throws IOException { } } + @Test + public void testStatistics2() throws IOException, NoSuchAlgorithmException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + StoragePolicySatisfierMode.EXTERNAL.toString()); + File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString()); + final Path jksPath = new Path(tmpDir.toString(), "test.jks"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + Path dir = new Path("/testStat"); + dfs.mkdirs(dir); + int readOps = 0; + int writeOps = 0; + FileSystem.clearStatistics(); + + // Quota Commands. + long opCount = getOpStatistics(OpType.SET_QUOTA_USAGE); + dfs.setQuota(dir, 100, 1000); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.SET_QUOTA_USAGE, opCount + 1); + + opCount = getOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE); + dfs.setQuotaByStorageType(dir, StorageType.DEFAULT, 2000); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE, opCount + 1); + + opCount = getOpStatistics(OpType.GET_QUOTA_USAGE); + dfs.getQuotaUsage(dir); + checkStatistics(dfs, ++readOps, writeOps, 0); + checkOpStatistics(OpType.GET_QUOTA_USAGE, opCount + 1); + + // Satisfy Storage Policy. + opCount = getOpStatistics(OpType.SATISFY_STORAGE_POLICY); + dfs.satisfyStoragePolicy(dir); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.SATISFY_STORAGE_POLICY, opCount + 1); + + // Cache Commands. + CachePoolInfo cacheInfo = + new CachePoolInfo("pool1").setMode(new FsPermission((short) 0)); + + opCount = getOpStatistics(OpType.ADD_CACHE_POOL); + dfs.addCachePool(cacheInfo); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.ADD_CACHE_POOL, opCount + 1); + + CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder() + .setPath(new Path(".")).setPool("pool1").build(); + + opCount = getOpStatistics(OpType.ADD_CACHE_DIRECTIVE); + long id = dfs.addCacheDirective(directive); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.ADD_CACHE_DIRECTIVE, opCount + 1); + + opCount = getOpStatistics(OpType.LIST_CACHE_DIRECTIVE); + dfs.listCacheDirectives(null); + checkStatistics(dfs, ++readOps, writeOps, 0); + checkOpStatistics(OpType.LIST_CACHE_DIRECTIVE, opCount + 1); + + opCount = getOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE); + dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id) + .setReplication((short) 2).build()); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE, opCount + 1); + + opCount = getOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE); + dfs.removeCacheDirective(id); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE, opCount + 1); + + opCount = getOpStatistics(OpType.MODIFY_CACHE_POOL); + dfs.modifyCachePool(cacheInfo); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.MODIFY_CACHE_POOL, opCount + 1); + + opCount = getOpStatistics(OpType.LIST_CACHE_POOL); + dfs.listCachePools(); + checkStatistics(dfs, ++readOps, writeOps, 0); + checkOpStatistics(OpType.LIST_CACHE_POOL, opCount + 1); + + opCount = getOpStatistics(OpType.REMOVE_CACHE_POOL); + dfs.removeCachePool(cacheInfo.getPoolName()); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.REMOVE_CACHE_POOL, opCount + 1); + + // Crypto Commands. + final KeyProvider provider = + cluster.getNameNode().getNamesystem().getProvider(); + final KeyProvider.Options options = KeyProvider.options(conf); + provider.createKey("key", options); + provider.flush(); + + opCount = getOpStatistics(OpType.CREATE_ENCRYPTION_ZONE); + dfs.createEncryptionZone(dir, "key"); + checkStatistics(dfs, readOps, ++writeOps, 0); + checkOpStatistics(OpType.CREATE_ENCRYPTION_ZONE, opCount + 1); + + opCount = getOpStatistics(OpType.LIST_ENCRYPTION_ZONE); + dfs.listEncryptionZones(); + checkStatistics(dfs, ++readOps, writeOps, 0); + checkOpStatistics(OpType.LIST_ENCRYPTION_ZONE, opCount + 1); + + opCount = getOpStatistics(OpType.GET_ENCRYPTION_ZONE); + dfs.getEZForPath(dir); + checkStatistics(dfs, ++readOps, writeOps, 0); + checkOpStatistics(OpType.GET_ENCRYPTION_ZONE, opCount + 1); + } + } + @Test public void testECStatistics() throws IOException { try (MiniDFSCluster cluster = From 54b11de2c0254eb0b8dc45dc9ce67758697df846 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 16 Jan 2019 17:32:59 -0800 Subject: [PATCH 05/13] HDDS-898. Continue token should contain the previous dir in Ozone s3g object list. Contributed by Elek Marton. --- .../ozone/s3/endpoint/BucketEndpoint.java | 20 +- .../hadoop/ozone/s3/util/ContinueToken.java | 173 ++++++++++++++++++ .../apache/hadoop/ozone/s3/util/S3utils.java | 67 +------ .../ozone/s3/endpoint/TestBucketGet.java | 49 ++++- .../ozone/s3/util/TestContinueToken.java | 50 +++++ 5 files changed, 286 insertions(+), 73 deletions(-) create mode 100644 hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/ContinueToken.java create mode 100644 hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestContinueToken.java diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java index d3b36b4837..67e25c3541 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java @@ -46,17 +46,16 @@ import org.apache.hadoop.ozone.s3.endpoint.MultiDeleteResponse.Error; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; +import org.apache.hadoop.ozone.s3.util.ContinueToken; +import org.apache.hadoop.ozone.s3.util.S3StorageType; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.ozone.s3.util.S3StorageType; -import org.apache.hadoop.ozone.s3.util.S3utils; +import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE; import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE; - /** * Bucket level rest endpoints. */ @@ -104,16 +103,17 @@ public Response list( Iterator ozoneKeyIterator; - String decodedToken = S3utils.decodeContinueToken(continueToken); + ContinueToken decodedToken = + ContinueToken.decodeFromString(continueToken); if (startAfter != null && continueToken != null) { // If continuation token and start after both are provided, then we // ignore start After - ozoneKeyIterator = bucket.listKeys(prefix, decodedToken); + ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey()); } else if (startAfter != null && continueToken == null) { ozoneKeyIterator = bucket.listKeys(prefix, startAfter); } else if (startAfter == null && continueToken != null){ - ozoneKeyIterator = bucket.listKeys(prefix, decodedToken); + ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey()); } else { ozoneKeyIterator = bucket.listKeys(prefix); } @@ -130,6 +130,9 @@ public Response list( response.setContinueToken(continueToken); String prevDir = null; + if (continueToken != null) { + prevDir = decodedToken.getLastDir(); + } String lastKey = null; int count = 0; while (ozoneKeyIterator.hasNext()) { @@ -176,7 +179,8 @@ public Response list( response.setTruncated(false); } else if(ozoneKeyIterator.hasNext()) { response.setTruncated(true); - response.setNextToken(S3utils.generateContinueToken(lastKey)); + ContinueToken nextToken = new ContinueToken(lastKey, prevDir); + response.setNextToken(nextToken.encodeToString()); } else { response.setTruncated(false); } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/ContinueToken.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/ContinueToken.java new file mode 100644 index 0000000000..92ae6d4473 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/ContinueToken.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.s3.util; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import org.apache.hadoop.ozone.s3.exception.OS3Exception; +import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; + +import com.google.common.base.Preconditions; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; + +/** + * Token which holds enough information to continue the key iteration. + */ +public class ContinueToken { + + private String lastKey; + + private String lastDir; + + private static final String CONTINUE_TOKEN_SEPERATOR = "-"; + + public ContinueToken(String lastKey, String lastDir) { + Preconditions.checkNotNull(lastKey, + "The last key can't be null in the continue token."); + this.lastKey = lastKey; + if (lastDir != null && lastDir.length() > 0) { + this.lastDir = lastDir; + } + } + + /** + * Generate a continuation token which is used in get Bucket. + * + * @return if key is not null return continuation token, else returns null. + */ + public String encodeToString() { + if (this.lastKey != null) { + + ByteBuffer buffer = ByteBuffer + .allocate(4 + lastKey.length() + + (lastDir == null ? 0 : lastDir.length())); + buffer.putInt(lastKey.length()); + buffer.put(lastKey.getBytes(StandardCharsets.UTF_8)); + if (lastDir != null) { + buffer.put(lastDir.getBytes(StandardCharsets.UTF_8)); + } + + String hex = Hex.encodeHexString(buffer.array()); + String digest = DigestUtils.sha256Hex(hex); + return hex + CONTINUE_TOKEN_SEPERATOR + digest; + } else { + return null; + } + } + + /** + * Decode a continuation token which is used in get Bucket. + * + * @param key + * @return if key is not null return decoded token, otherwise returns null. + * @throws OS3Exception + */ + public static ContinueToken decodeFromString(String key) throws OS3Exception { + if (key != null) { + int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR); + if (indexSeparator == -1) { + throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key); + } + String hex = key.substring(0, indexSeparator); + String digest = key.substring(indexSeparator + 1); + try { + checkHash(key, hex, digest); + + ByteBuffer buffer = ByteBuffer.wrap(Hex.decodeHex(hex)); + int keySize = buffer.getInt(); + + byte[] actualKeyBytes = new byte[keySize]; + buffer.get(actualKeyBytes); + + byte[] actualDirBytes = new byte[buffer.remaining()]; + buffer.get(actualDirBytes); + + return new ContinueToken( + new String(actualKeyBytes, StandardCharsets.UTF_8), + new String(actualDirBytes, StandardCharsets.UTF_8) + ); + + } catch (DecoderException ex) { + OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable + .INVALID_ARGUMENT, key); + os3Exception.setErrorMessage("The continuation token provided is " + + "incorrect"); + throw os3Exception; + } + } else { + return null; + } + } + + private static void checkHash(String key, String hex, String digest) + throws OS3Exception { + String digestActualKey = DigestUtils.sha256Hex(hex); + if (!digest.equals(digestActualKey)) { + OS3Exception ex = S3ErrorTable.newError(S3ErrorTable + .INVALID_ARGUMENT, key); + ex.setErrorMessage("The continuation token provided is incorrect"); + throw ex; + } + } + + public String getLastKey() { + return lastKey; + } + + public void setLastKey(String lastKey) { + this.lastKey = lastKey; + } + + public String getLastDir() { + return lastDir; + } + + public void setLastDir(String lastDir) { + this.lastDir = lastDir; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContinueToken that = (ContinueToken) o; + return lastKey.equals(that.lastKey) && + Objects.equals(lastDir, that.lastDir); + } + + @Override + public int hashCode() { + return Objects.hash(lastKey); + } + + @Override + public String toString() { + return "ContinueToken{" + + "lastKey='" + lastKey + '\'' + + ", lastDir='" + lastDir + '\'' + + '}'; + } +} diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java index 3ce9bbdaa1..7b9c7e8e5a 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java @@ -18,19 +18,11 @@ */ package org.apache.hadoop.ozone.s3.util; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ozone.s3.exception.OS3Exception; -import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; - - -import java.nio.charset.StandardCharsets; import java.util.regex.Matcher; -import static org.apache.hadoop.ozone.s3.util.S3Consts - .RANGE_HEADER_MATCH_PATTERN; +import org.apache.hadoop.classification.InterfaceAudience; + +import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_MATCH_PATTERN; /** * Utility class for S3. */ @@ -40,60 +32,9 @@ public final class S3utils { private S3utils() { } - private static final String CONTINUE_TOKEN_SEPERATOR = "-"; - /** - * Generate a continuation token which is used in get Bucket. - * @param key - * @return if key is not null return continuation token, else returns null. - */ - public static String generateContinueToken(String key) { - if (key != null) { - byte[] byteData = key.getBytes(StandardCharsets.UTF_8); - String hex = Hex.encodeHexString(byteData); - String digest = DigestUtils.sha256Hex(key); - return hex + CONTINUE_TOKEN_SEPERATOR + digest; - } else { - return null; - } - } - /** - * Decode a continuation token which is used in get Bucket. - * @param key - * @return if key is not null return decoded token, otherwise returns null. - * @throws OS3Exception - */ - public static String decodeContinueToken(String key) throws OS3Exception { - if (key != null) { - int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR); - if (indexSeparator == -1) { - throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key); - } - String hex = key.substring(0, indexSeparator); - String digest = key.substring(indexSeparator + 1); - try { - byte[] actualKeyBytes = Hex.decodeHex(hex); - String digestActualKey = DigestUtils.sha256Hex(actualKeyBytes); - if (digest.equals(digestActualKey)) { - return new String(actualKeyBytes, StandardCharsets.UTF_8); - } else { - OS3Exception ex = S3ErrorTable.newError(S3ErrorTable - .INVALID_ARGUMENT, key); - ex.setErrorMessage("The continuation token provided is incorrect"); - throw ex; - } - } catch (DecoderException ex) { - OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable - .INVALID_ARGUMENT, key); - os3Exception.setErrorMessage("The continuation token provided is " + - "incorrect"); - throw os3Exception; - } - } else { - return null; - } - } + /** diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketGet.java index 8b70d2990b..b3410b4709 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketGet.java @@ -211,6 +211,53 @@ public void listWithContinuationToken() throws OS3Exception, IOException { } + @Test + public void listWithContinuationTokenDirBreak() + throws OS3Exception, IOException { + + BucketEndpoint getBucket = new BucketEndpoint(); + + OzoneClient ozoneClient = + createClientWithKeys( + "test/dir1/file1", + "test/dir1/file2", + "test/dir1/file3", + "test/dir2/file4", + "test/dir2/file5", + "test/dir2/file6", + "test/dir3/file7", + "test/file8"); + + getBucket.setClient(ozoneClient); + + int maxKeys = 2; + + ListObjectResponse getBucketResponse; + + getBucketResponse = + (ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys, + "test/", null, null, null, null).getEntity(); + + Assert.assertEquals(0, getBucketResponse.getContents().size()); + Assert.assertEquals(2, getBucketResponse.getCommonPrefixes().size()); + Assert.assertEquals("test/dir1/", + getBucketResponse.getCommonPrefixes().get(0).getPrefix()); + Assert.assertEquals("test/dir2/", + getBucketResponse.getCommonPrefixes().get(1).getPrefix()); + + getBucketResponse = + (ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys, + "test/", null, getBucketResponse.getNextToken(), null, null) + .getEntity(); + Assert.assertEquals(1, getBucketResponse.getContents().size()); + Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size()); + Assert.assertEquals("test/dir3/", + getBucketResponse.getCommonPrefixes().get(0).getPrefix()); + Assert.assertEquals("test/file8", + getBucketResponse.getContents().get(0).getKey()); + + } + @Test /** * This test is with prefix and delimiter and verify continuation-token @@ -237,7 +284,6 @@ public void listWithContinuationToken1() throws OS3Exception, IOException { Assert.assertTrue(getBucketResponse.isTruncated()); Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2); - // 2nd time String continueToken = getBucketResponse.getNextToken(); getBucketResponse = @@ -246,7 +292,6 @@ public void listWithContinuationToken1() throws OS3Exception, IOException { Assert.assertTrue(getBucketResponse.isTruncated()); Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2); - //3rd time continueToken = getBucketResponse.getNextToken(); getBucketResponse = diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestContinueToken.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestContinueToken.java new file mode 100644 index 0000000000..a590367dfa --- /dev/null +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestContinueToken.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.s3.util; + +import org.apache.hadoop.ozone.s3.exception.OS3Exception; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test encode/decode of the continue token. + */ +public class TestContinueToken { + + @Test + public void encodeDecode() throws OS3Exception { + ContinueToken ct = new ContinueToken("key1", "dir1"); + + ContinueToken parsedToken = + ContinueToken.decodeFromString(ct.encodeToString()); + + Assert.assertEquals(ct, parsedToken); + } + + @Test + public void encodeDecodeNullDir() throws OS3Exception { + ContinueToken ct = new ContinueToken("key1", null); + + ContinueToken parsedToken = + ContinueToken.decodeFromString(ct.encodeToString()); + + Assert.assertEquals(ct, parsedToken); + } + +} \ No newline at end of file From 96ea464aaadeadc1ef149dab93813bcfd94fa21a Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Wed, 16 Jan 2019 17:43:30 -0800 Subject: [PATCH 06/13] HDDS-971. ContainerDataConstructor throws exception on QUASI_CLOSED and UNHEALTHY container state. Contributed by Lokesh Jain. --- .../common/impl/ContainerDataYaml.java | 17 ++--------------- .../container/ozoneimpl/ContainerReader.java | 4 ++-- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java index 2d09124a46..15719440f3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java @@ -256,21 +256,8 @@ public Object construct(Node node) { kvData.setMetadata(meta); kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM)); String state = (String) nodes.get(OzoneConsts.STATE); - switch (state) { - case "OPEN": - kvData.setState(ContainerProtos.ContainerDataProto.State.OPEN); - break; - case "CLOSING": - kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSING); - break; - case "CLOSED": - kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED); - break; - default: - throw new IllegalStateException("Unexpected " + - "ContainerLifeCycleState " + state + " for the containerId " + - nodes.get(OzoneConsts.CONTAINER_ID)); - } + kvData + .setState(ContainerProtos.ContainerDataProto.State.valueOf(state)); return kvData; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 43f78ed4eb..0192fd5dd1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -96,8 +96,8 @@ public void run() { try { readVolume(hddsVolumeDir); } catch (RuntimeException ex) { - LOG.info("Caught an Run time exception during reading container files" + - " from Volume {}", hddsVolumeDir); + LOG.error("Caught a Run time exception during reading container files" + + " from Volume {} {}", hddsVolumeDir, ex); } } From 0a46baecd31c485d1ea4e567c29c47bfba0b092e Mon Sep 17 00:00:00 2001 From: rahul3 Date: Thu, 8 Nov 2018 21:27:06 -0500 Subject: [PATCH 07/13] YARN-9203. Fix typos in yarn-default.xml. This closes #437 Signed-off-by: Akira Ajisaka --- .../hadoop-yarn-common/src/main/resources/yarn-default.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a0e0eda74f..fea635b42b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -539,8 +539,8 @@ This property allows users to set ACLs of their choice instead of using the default mechanism. For fencing to work, the ACLs should be - carefully set differently on each ResourceManger such that all the - ResourceManagers have shared admin access and the Active ResourceManger + carefully set differently on each ResourceManager such that all the + ResourceManagers have shared admin access and the Active ResourceManager takes over (exclusively) the create-delete access. yarn.resourcemanager.zk-state-store.root-node.acl From 6d7eedfd28cc1712690db2f6ca8a281b0901ee28 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 17 Jan 2019 14:20:10 -0800 Subject: [PATCH 08/13] YARN-9194. Invalid event: REGISTERED and LAUNCH_FAILED at FAILED, and NullPointerException happens in RM while shutdown a NM. (lujie via wangda) Change-Id: I4359f59a73a278a941f4bb9d106dd38c9cb471fe --- .../rmapp/attempt/RMAppAttemptImpl.java | 14 +++- .../attempt/TestRMAppAttemptTransitions.java | 80 ++++++++++++++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 3ec9c49818..03039daae9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -437,9 +437,11 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) RMAppAttemptState.FAILED, EnumSet.of( RMAppAttemptEventType.LAUNCHED, + RMAppAttemptEventType.LAUNCH_FAILED, RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.KILL, RMAppAttemptEventType.FAIL, + RMAppAttemptEventType.REGISTERED, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.STATUS_UPDATE, RMAppAttemptEventType.CONTAINER_ALLOCATED)) @@ -1203,10 +1205,16 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Set the masterContainer - appAttempt.setMasterContainer(amContainerAllocation.getContainers() - .get(0)); + Container amContainer = amContainerAllocation.getContainers().get(0); RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler - .getRMContainer(appAttempt.getMasterContainer().getId()); + .getRMContainer(amContainer.getId()); + //while one NM is removed, the scheduler will clean the container,the + //following CONTAINER_FINISHED event will handle the cleaned container. + //so just return RMAppAttemptState.SCHEDULED + if (rmMasterContainer == null) { + return RMAppAttemptState.SCHEDULED; + } + appAttempt.setMasterContainer(amContainer); rmMasterContainer.setAMContainer(true); // The node set in NMTokenSecrentManager is used for marking whether the // NMToken has been issued for this node to the AM. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 4a5c671969..faecdb4660 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -986,7 +986,7 @@ public void testLaunchedAtFinalSaving() { public void testAttemptAddedAtFinalSaving() { submitApplicationAttempt(); - // SUBNITED->FINAL_SAVING + // SUBMITTED->FINAL_SAVING applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt .getAppAttemptId(), RMAppAttemptEventType.KILL)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -999,6 +999,56 @@ public void testAttemptAddedAtFinalSaving() { applicationAttempt.getAppAttemptState()); } + @Test(timeout = 10000) + public void testAttemptRegisteredAtFailed() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + + //send CONTAINER_FINISHED event + NodeId anyNodeId = NodeId.newInstance("host", 1234); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( + amContainer.getId(), ContainerState.COMPLETE, "", 0, + amContainer.getResource()), anyNodeId)); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + + sendAttemptUpdateSavedEvent(applicationAttempt); + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + + //send REGISTERED event + applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt + .getAppAttemptId(), RMAppAttemptEventType.REGISTERED)); + + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + } + + @Test + public void testAttemptLaunchFailedAtFailed() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + //send CONTAINER_FINISHED event + NodeId anyNodeId = NodeId.newInstance("host", 1234); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( + amContainer.getId(), ContainerState.COMPLETE, "", 0, + amContainer.getResource()), anyNodeId)); + assertEquals(RMAppAttemptState.FINAL_SAVING, + applicationAttempt.getAppAttemptState()); + sendAttemptUpdateSavedEvent(applicationAttempt); + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + + //send LAUNCH_FAILED event + applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt + .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED)); + + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + } + @Test public void testAMCrashAtAllocated() { Container amContainer = allocateApplicationAttempt(); @@ -1598,6 +1648,34 @@ public void testFailedToFailed() { assertTrue(found); } + @Test + public void testContainerRemovedBeforeAllocate() { + scheduleApplicationAttempt(); + + // Mock the allocation of AM container + Container container = mock(Container.class); + Resource resource = BuilderUtils.newResource(2048, 1); + when(container.getId()).thenReturn( + BuilderUtils.newContainerId(applicationAttempt.getAppAttemptId(), 1)); + when(container.getResource()).thenReturn(resource); + Allocation allocation = mock(Allocation.class); + when(allocation.getContainers()). + thenReturn(Collections.singletonList(container)); + when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), + any(List.class), any(List.class), any(List.class), any(List.class), + any(ContainerUpdates.class))). + thenReturn(allocation); + + //container removed, so return null + when(scheduler.getRMContainer(container.getId())). + thenReturn(null); + + applicationAttempt.handle( + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.CONTAINER_ALLOCATED)); + assertEquals(RMAppAttemptState.SCHEDULED, + applicationAttempt.getAppAttemptState()); + } @SuppressWarnings("deprecation") @Test From 96a84b61fb693e9657f1f1ae3d6ed3a04a7e8efa Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Fri, 18 Jan 2019 15:31:45 +0900 Subject: [PATCH 09/13] HDFS-14213. Remove Jansson from BUILDING.txt. Contributed by Dinesh Chitlangia. --- BUILDING.txt | 3 --- 1 file changed, 3 deletions(-) diff --git a/BUILDING.txt b/BUILDING.txt index d781c2ca0b..cc9ac177ca 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -14,7 +14,6 @@ Requirements: Clang (community version), Clang (version for iOS 9 and later) (if compiling native code) * openssl devel (if compiling native hadoop-pipes and to get the best HDFS encryption performance) * Linux FUSE (Filesystem in Userspace) version 2.6 or above (if compiling fuse_dfs) -* Jansson C XML parsing library ( if compiling libwebhdfs ) * Doxygen ( if compiling libhdfspp and generating the documents ) * Internet connection for first build (to fetch all Maven and Hadoop dependencies) * python (for releasedocs) @@ -75,8 +74,6 @@ Optional packages: (OR https://github.com/01org/isa-l) * Bzip2 $ sudo apt-get install bzip2 libbz2-dev -* Jansson (C Library for JSON) - $ sudo apt-get install libjansson-dev * Linux FUSE $ sudo apt-get install fuse libfuse-dev * ZStandard compression From dacc1a759e3ba3eca000cbacc6145b231253b174 Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Fri, 18 Jan 2019 13:55:12 +0530 Subject: [PATCH 10/13] HDFS-14175. EC: Native XOR decoder should reset the output buffer before using it. Contributed by Ayush Saxena. --- .../src/org/apache/hadoop/io/erasurecode/jni_xor_decoder.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/erasurecode/jni_xor_decoder.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/erasurecode/jni_xor_decoder.c index d2de0c67cf..6832aa8253 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/erasurecode/jni_xor_decoder.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/erasurecode/jni_xor_decoder.c @@ -66,6 +66,8 @@ Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawDecoder_decodeImpl( numDataUnits + numParityUnits); getOutputs(env, outputs, outputOffsets, xorDecoder->outputs, numParityUnits); + memset(xorDecoder->outputs[0], 0, chunkSize); + for (i = 0; i < numDataUnits + numParityUnits; i++) { if (xorDecoder->inputs[i] == NULL) { continue; From 8c7f6b2d4df2e5ca7b766db68213b778d28f198b Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Fri, 18 Jan 2019 13:49:56 -0500 Subject: [PATCH 11/13] YARN-9197. Add safe guard against NPE for component instance failure. Contributed by kyungwan nam --- .../service/component/instance/ComponentInstance.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index dab1e463c0..66c298d337 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -350,8 +350,10 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, // record in ATS LOG.info("Publishing component instance status {} {} ", event.getContainerId(), containerState); + int exitStatus = failureBeforeLaunch || event.getStatus() == null ? + ContainerExitStatus.INVALID : event.getStatus().getExitStatus(); compInstance.serviceTimelinePublisher.componentInstanceFinished( - event.getContainerId(), event.getStatus().getExitStatus(), + event.getContainerId(), exitStatus, containerState, containerDiag); } @@ -366,8 +368,10 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, if (compInstance.timelineServiceEnabled) { // record in ATS + int exitStatus = failureBeforeLaunch || event.getStatus() == null ? + ContainerExitStatus.INVALID : event.getStatus().getExitStatus(); compInstance.serviceTimelinePublisher.componentInstanceFinished( - event.getContainerId(), event.getStatus().getExitStatus(), + event.getContainerId(), exitStatus, containerState, containerDiag); } From 4ac0404fe01c2266068b6fc54588e3a4bcec3e12 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Fri, 18 Jan 2019 11:08:35 -0800 Subject: [PATCH 12/13] HDDS-959. KeyOutputStream should handle retry failures. Contributed by Lokesh Jain. --- .../hadoop/hdds/scm/XceiverClientManager.java | 11 +- .../scm/client/ContainerOperationClient.java | 10 +- .../hdds/scm/storage/BlockInputStream.java | 2 +- .../hdds/scm/storage/BlockOutputStream.java | 17 +-- .../ozone/client/io/KeyInputStream.java | 2 +- .../ozone/client/io/KeyOutputStream.java | 109 ++++++++---------- .../TestContainerStateMachineIdempotency.java | 2 +- .../rpc/TestOzoneRpcClientAbstract.java | 2 +- .../ozone/scm/TestContainerSmallFile.java | 8 +- .../TestGetCommittedBlockLengthAndPutKey.java | 6 +- .../ozone/scm/TestXceiverClientManager.java | 52 +++++++-- 11 files changed, 127 insertions(+), 94 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index b2735bc79f..f3614635ea 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -129,11 +129,20 @@ public XceiverClientSpi acquireClient(Pipeline pipeline) * Releases a XceiverClientSpi after use. * * @param client client to release + * @param invalidateClient if true, invalidates the client in cache */ - public void releaseClient(XceiverClientSpi client) { + public void releaseClient(XceiverClientSpi client, boolean invalidateClient) { Preconditions.checkNotNull(client); synchronized (clientCache) { client.decrementReference(); + if (invalidateClient) { + Pipeline pipeline = client.getPipeline(); + String key = pipeline.getId().getId().toString() + pipeline.getType(); + XceiverClientSpi cachedClient = clientCache.getIfPresent(key); + if (cachedClient == client) { + clientCache.invalidate(key); + } + } } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 85b5d29f32..6dddc91dff 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -100,7 +100,7 @@ public ContainerWithPipeline createContainer(String owner) return containerWithPipeline; } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -191,7 +191,7 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, return containerWithPipeline; } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -269,7 +269,7 @@ public void deleteContainer(long containerId, Pipeline pipeline, } } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -318,7 +318,7 @@ public ContainerDataProto readContainer(long containerID, return response.getContainerData(); } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } @@ -410,7 +410,7 @@ public void closeContainer(long containerId, Pipeline pipeline) ObjectStageChangeRequestProto.Stage.complete); } finally { if (client != null) { - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index ddd01d3083..5303efd2aa 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -141,7 +141,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { @Override public synchronized void close() { if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, false); xceiverClientManager = null; xceiverClient = null; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index b62f7b6a66..6cc0b54e7f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -113,7 +114,7 @@ public class BlockOutputStream extends OutputStream { * @param blockID block ID * @param key chunk key * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls + * @param pipeline pipeline where block will be written * @param traceID container protocol call args * @param chunkSize chunk size * @param bufferList list of byte buffers @@ -124,10 +125,10 @@ public class BlockOutputStream extends OutputStream { */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, + XceiverClientManager xceiverClientManager, Pipeline pipeline, String traceID, int chunkSize, long streamBufferFlushSize, - long streamBufferMaxSize, long watchTimeout, - List bufferList, Checksum checksum) { + long streamBufferMaxSize, long watchTimeout, List bufferList, + Checksum checksum) throws IOException { this.blockID = blockID; this.key = key; this.traceID = traceID; @@ -138,7 +139,7 @@ public BlockOutputStream(BlockID blockID, String key, BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; + this.xceiverClient = xceiverClientManager.acquireClient(pipeline); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; this.streamBufferFlushSize = streamBufferFlushSize; @@ -500,7 +501,7 @@ public void close() throws IOException { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { - cleanup(); + cleanup(false); } } // clear the currentBuffer @@ -541,9 +542,9 @@ private void setIoException(Exception e) { } } - public void cleanup() { + public void cleanup(boolean invalidateClient) { if (xceiverClientManager != null) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, invalidateClient); } xceiverClientManager = null; xceiverClient = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 99817fbbbc..dde3641f05 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -311,7 +311,7 @@ public static LengthInputStream getFromOmKeyInfo( omKeyLocationInfo.getLength()); } finally { if (!success) { - xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager.releaseClient(xceiverClient, false); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 66e419906d..cbbfed859f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.*; @@ -31,11 +32,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,19 +108,6 @@ public KeyOutputStream() { this.checksum = new Checksum(); } - /** - * For testing purpose only. Not building output stream from blocks, but - * taking from externally. - * - * @param outputStream - * @param length - */ - @VisibleForTesting - public void addStream(OutputStream outputStream, long length) { - streamEntries.add( - new BlockOutputStreamEntry(outputStream, length, checksum)); - } - @VisibleForTesting public List getStreamEntries() { return streamEntries; @@ -213,12 +201,11 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) throws IOException { ContainerWithPipeline containerWithPipeline = scmClient .getContainerWithPipeline(subKeyInfo.getContainerID()); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(), - keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksum)); + keyArgs.getKeyName(), xceiverClientManager, + containerWithPipeline.getPipeline(), requestID, chunkSize, + subKeyInfo.getLength(), streamBufferFlushSize, streamBufferMaxSize, + watchTimeout, bufferList, checksum)); } @@ -297,12 +284,14 @@ private void handleWrite(byte[] b, int off, long len, boolean retry) current.write(b, off, writeLen); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + boolean retryFailure = checkForRetryFailure(ioe); + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) + || retryFailure) { // for the current iteration, totalDataWritten - currentPos gives the // amount of data already written to the buffer writeLen = (int) (current.getWrittenDataLength() - currentPos); LOG.debug("writeLen {}, total len {}", writeLen, len); - handleException(current, currentStreamIndex); + handleException(current, currentStreamIndex, retryFailure); } else { throw ioe; } @@ -362,17 +351,19 @@ private void removeEmptyBlocks() { * * @param streamEntry StreamEntry * @param streamIndex Index of the entry + * @param retryFailure if true the xceiverClient needs to be invalidated in + * the client cache. * @throws IOException Throws IOException if Write fails */ private void handleException(BlockOutputStreamEntry streamEntry, - int streamIndex) throws IOException { + int streamIndex, boolean retryFailure) throws IOException { long totalSuccessfulFlushedData = streamEntry.getTotalSuccessfulFlushedData(); //set the correct length for the current stream streamEntry.currentPosition = totalSuccessfulFlushedData; long bufferedDataLen = computeBufferData(); // just clean up the current stream. - streamEntry.cleanup(); + streamEntry.cleanup(retryFailure); if (bufferedDataLen > 0) { // If the data is still cached in the underlying stream, we need to // allocate new block and write this data in the datanode. @@ -390,7 +381,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, private boolean checkIfContainerIsClosed(IOException ioe) { if (ioe.getCause() != null) { - return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || Optional + return checkForException(ioe, ContainerNotOpenException.class) || Optional .of(ioe.getCause()) .filter(e -> e instanceof StorageContainerException) .map(e -> (StorageContainerException) e) @@ -400,13 +391,23 @@ private boolean checkIfContainerIsClosed(IOException ioe) { return false; } - private boolean checkIfContainerNotOpenOrRaftRetryFailureException( - IOException ioe) { + /** + * Checks if the provided exception signifies retry failure in ratis client. + * In case of retry failure, ratis client throws RaftRetryFailureException + * and all succeeding operations are failed with AlreadyClosedException. + */ + private boolean checkForRetryFailure(IOException ioe) { + return checkForException(ioe, RaftRetryFailureException.class, + AlreadyClosedException.class); + } + + private boolean checkForException(IOException ioe, Class... classes) { Throwable t = ioe.getCause(); while (t != null) { - if (t instanceof ContainerNotOpenException - || t instanceof RaftRetryFailureException) { - return true; + for (Class cls : classes) { + if (cls.isInstance(t)) { + return true; + } } t = t.getCause(); } @@ -469,11 +470,13 @@ private void handleFlushOrClose(boolean close) throws IOException { entry.flush(); } } catch (IOException ioe) { - if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) { + boolean retryFailure = checkForRetryFailure(ioe); + if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe) + || retryFailure) { // This call will allocate a new streamEntry and write the Data. // Close needs to be retried on the newly allocated streamEntry as // as well. - handleException(entry, streamIndex); + handleException(entry, streamIndex, retryFailure); handleFlushOrClose(close); } else { throw ioe; @@ -643,7 +646,7 @@ private static class BlockOutputStreamEntry extends OutputStream { private BlockID blockID; private final String key; private final XceiverClientManager xceiverClientManager; - private final XceiverClientSpi xceiverClient; + private final Pipeline pipeline; private final Checksum checksum; private final String requestId; private final int chunkSize; @@ -660,14 +663,14 @@ private static class BlockOutputStreamEntry extends OutputStream { @SuppressWarnings("parameternumber") BlockOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String requestId, int chunkSize, + Pipeline pipeline, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, long watchTimeout, List bufferList, Checksum checksum) { this.outputStream = null; this.blockID = blockID; this.key = key; this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; + this.pipeline = pipeline; this.requestId = requestId; this.chunkSize = chunkSize; @@ -680,30 +683,6 @@ private static class BlockOutputStreamEntry extends OutputStream { this.bufferList = bufferList; } - /** - * For testing purpose, taking a some random created stream instance. - * @param outputStream a existing writable output stream - * @param length the length of data to write to the stream - */ - BlockOutputStreamEntry(OutputStream outputStream, long length, - Checksum checksum) { - this.outputStream = outputStream; - this.blockID = null; - this.key = null; - this.xceiverClientManager = null; - this.xceiverClient = null; - this.requestId = null; - this.chunkSize = -1; - - this.length = length; - this.currentPosition = 0; - streamBufferFlushSize = 0; - streamBufferMaxSize = 0; - bufferList = null; - watchTimeout = 0; - this.checksum = checksum; - } - long getLength() { return length; } @@ -712,11 +691,17 @@ long getRemaining() { return length - currentPosition; } - private void checkStream() { + /** + * BlockOutputStream is initialized in this function. This makes sure that + * xceiverClient initialization is not done during preallocation and only + * done when data is written. + * @throws IOException if xceiverClient initialization fails + */ + private void checkStream() throws IOException { if (this.outputStream == null) { this.outputStream = new BlockOutputStream(blockID, key, xceiverClientManager, - xceiverClient, requestId, chunkSize, streamBufferFlushSize, + pipeline, requestId, chunkSize, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, bufferList, checksum); } } @@ -781,11 +766,11 @@ long getWrittenDataLength() throws IOException { throw new IOException("Invalid Output Stream for Key: " + key); } - void cleanup() { + void cleanup(boolean invalidateClient) throws IOException { checkStream(); if (this.outputStream instanceof BlockOutputStream) { BlockOutputStream out = (BlockOutputStream) this.outputStream; - out.cleanup(); + out.cleanup(invalidateClient); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java index 78a85110d3..c4dbb8bebb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -116,6 +116,6 @@ public void testContainerStateMachineIdempotency() throws Exception { } catch (IOException ioe) { Assert.fail("Container operation failed" + ioe); } - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index e7bca5e78d..529fcc3968 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -698,7 +698,7 @@ public void testPutKeyAndGetKeyThreeNodes() Assert.assertTrue( e.getMessage().contains("on the pipeline " + pipeline.getId())); } - manager.releaseClient(clientSpi); + manager.releaseClient(clientSpi, false); } private void readKey(OzoneBucket bucket, String keyName, String data) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index ecf0d846b1..daf09c1e8d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -98,7 +98,7 @@ public void testAllocateWrite() throws Exception { ContainerProtocolCalls.readSmallFile(client, blockID, traceID); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -121,7 +121,7 @@ public void testInvalidBlockRead() throws Exception { // Try to read a Key Container Name ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, blockID, traceID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -149,7 +149,7 @@ public void testInvalidContainerRead() throws Exception { ContainerProtocolCalls.readSmallFile(client, ContainerTestHelper.getTestBlockID( nonExistContainerID), traceID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -202,7 +202,7 @@ public void testReadWriteWithBCSId() throws Exception { ContainerProtocolCalls.readSmallFile(client, blockID1, traceID); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index b4601fa4d0..7c31b14362 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -114,7 +114,7 @@ public void tesGetCommittedBlockLength() throws Exception { Assert.assertTrue( BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); Assert.assertTrue(response.getBlockLength() == data.length); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -139,7 +139,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { } catch (StorageContainerException sce) { Assert.assertTrue(sce.getMessage().contains("Unable to find the block")); } - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } @Test @@ -180,6 +180,6 @@ public void tesPutKeyResposne() throws Exception { // This will also ensure that closing the container committed the block // on the Datanodes. Assert.assertEquals(responseBlockID, blockID); - xceiverClientManager.releaseClient(client); + xceiverClientManager.releaseClient(client, false); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 8b35bbbb18..f28493fed8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -96,9 +96,9 @@ public void testCaching() throws IOException { Assert.assertEquals(2, client3.getRefcount()); Assert.assertEquals(2, client1.getRefcount()); Assert.assertEquals(client1, client3); - clientManager.releaseClient(client1); - clientManager.releaseClient(client2); - clientManager.releaseClient(client3); + clientManager.releaseClient(client1, false); + clientManager.releaseClient(client2, false); + clientManager.releaseClient(client3, false); } @Test @@ -140,7 +140,7 @@ public void testFreeByReference() throws IOException { // After releasing the client, this connection should be closed // and any container operations should fail - clientManager.releaseClient(client1); + clientManager.releaseClient(client1, false); String expectedMessage = "This channel is not connected."; try { @@ -152,7 +152,7 @@ public void testFreeByReference() throws IOException { Assert.assertEquals(e.getClass(), IOException.class); Assert.assertTrue(e.getMessage().contains(expectedMessage)); } - clientManager.releaseClient(client2); + clientManager.releaseClient(client2, false); } @Test @@ -171,7 +171,7 @@ public void testFreeByEviction() throws IOException { .acquireClient(container1.getPipeline()); Assert.assertEquals(1, client1.getRefcount()); - clientManager.releaseClient(client1); + clientManager.releaseClient(client1, false); Assert.assertEquals(0, client1.getRefcount()); ContainerWithPipeline container2 = storageContainerLocationClient @@ -200,6 +200,44 @@ public void testFreeByEviction() throws IOException { Assert.assertEquals(e.getClass(), IOException.class); Assert.assertTrue(e.getMessage().contains(expectedMessage)); } - clientManager.releaseClient(client2); + clientManager.releaseClient(client2, false); + } + + @Test + public void testFreeByRetryFailure() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); + XceiverClientManager clientManager = new XceiverClientManager(conf); + Cache cache = + clientManager.getClientCache(); + + // client is added in cache + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(clientManager.getType(), clientManager.getFactor(), + containerOwner); + XceiverClientSpi client1 = + clientManager.acquireClient(container1.getPipeline()); + clientManager.acquireClient(container1.getPipeline()); + Assert.assertEquals(2, client1.getRefcount()); + + // client should be invalidated in the cache + clientManager.releaseClient(client1, true); + Assert.assertEquals(1, client1.getRefcount()); + Assert.assertNull(cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType())); + + // new client should be added in cache + XceiverClientSpi client2 = + clientManager.acquireClient(container1.getPipeline()); + Assert.assertNotEquals(client1, client2); + Assert.assertEquals(1, client2.getRefcount()); + + // on releasing the old client the entry in cache should not be invalidated + clientManager.releaseClient(client1, true); + Assert.assertEquals(0, client1.getRefcount()); + Assert.assertNotNull(cache.getIfPresent( + container1.getContainerInfo().getPipelineID().getId().toString() + + container1.getContainerInfo().getReplicationType())); } } From c26d354e7d846e14845c08a378dd77f33284168b Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Fri, 18 Jan 2019 13:16:38 -0800 Subject: [PATCH 13/13] HDDS-983. Rename S3Utils to avoid conflict with HDFS classes. Contributed by Bharat Viswanadham. --- .../ozone/s3/endpoint/ObjectEndpoint.java | 4 ++-- ...3utils.java => RangeHeaderParserUtil.java} | 9 ++------- ...ls.java => TestRangeHeaderParserUtil.java} | 20 +++++++++---------- 3 files changed, 14 insertions(+), 19 deletions(-) rename hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/{S3utils.java => RangeHeaderParserUtil.java} (97%) rename hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/{TestS3utils.java => TestRangeHeaderParserUtil.java} (80%) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index a9306ff398..0b75e53248 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -60,8 +60,8 @@ import org.apache.hadoop.ozone.s3.io.S3WrapperInputStream; import org.apache.hadoop.ozone.s3.util.RFC1123Util; import org.apache.hadoop.ozone.s3.util.RangeHeader; +import org.apache.hadoop.ozone.s3.util.RangeHeaderParserUtil; import org.apache.hadoop.ozone.s3.util.S3StorageType; -import org.apache.hadoop.ozone.s3.util.S3utils; import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.util.Time; @@ -210,7 +210,7 @@ public Response get( LOG.debug("range Header provided value is {}", rangeHeaderVal); if (rangeHeaderVal != null) { - rangeHeader = S3utils.parseRangeHeader(rangeHeaderVal, + rangeHeader = RangeHeaderParserUtil.parseRangeHeader(rangeHeaderVal, length); LOG.debug("range Header provided value is {}", rangeHeader); if (rangeHeader.isInValidRange()) { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/RangeHeaderParserUtil.java similarity index 97% rename from hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java rename to hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/RangeHeaderParserUtil.java index 7b9c7e8e5a..b1b61ccc64 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3utils.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/RangeHeaderParserUtil.java @@ -27,16 +27,11 @@ * Utility class for S3. */ @InterfaceAudience.Private -public final class S3utils { - - private S3utils() { +public final class RangeHeaderParserUtil { + private RangeHeaderParserUtil() { } - - - - /** * Parse the rangeHeader and set the start and end offset. * @param rangeHeaderVal diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3utils.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestRangeHeaderParserUtil.java similarity index 80% rename from hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3utils.java rename to hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestRangeHeaderParserUtil.java index 14f5d243b0..03c91bfde9 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3utils.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestRangeHeaderParserUtil.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertEquals; /** - * Test class to test S3utils. + * Test class to test RangeHeaderParserUtil. */ -public class TestS3utils { +public class TestRangeHeaderParserUtil { @Test public void testRangeHeaderParser() { @@ -34,14 +34,14 @@ public void testRangeHeaderParser() { //range is with in file length - rangeHeader = S3utils.parseRangeHeader("bytes=0-8", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=0-8", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(8, rangeHeader.getEndOffset()); assertEquals(false, rangeHeader.isReadFull()); assertEquals(false, rangeHeader.isInValidRange()); //range is with in file length, both start and end offset are same - rangeHeader = S3utils.parseRangeHeader("bytes=0-0", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=0-0", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(0, rangeHeader.getEndOffset()); assertEquals(false, rangeHeader.isReadFull()); @@ -49,39 +49,39 @@ public void testRangeHeaderParser() { //range is not with in file length, both start and end offset are greater // than length - rangeHeader = S3utils.parseRangeHeader("bytes=11-10", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=11-10", 10); assertEquals(true, rangeHeader.isInValidRange()); // range is satisfying, one of the range is with in the length. So, read // full file - rangeHeader = S3utils.parseRangeHeader("bytes=11-8", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=11-8", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(9, rangeHeader.getEndOffset()); assertEquals(true, rangeHeader.isReadFull()); assertEquals(false, rangeHeader.isInValidRange()); // bytes spec is wrong - rangeHeader = S3utils.parseRangeHeader("mb=11-8", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("mb=11-8", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(9, rangeHeader.getEndOffset()); assertEquals(true, rangeHeader.isReadFull()); assertEquals(false, rangeHeader.isInValidRange()); // range specified is invalid - rangeHeader = S3utils.parseRangeHeader("bytes=-11-8", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-11-8", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(9, rangeHeader.getEndOffset()); assertEquals(true, rangeHeader.isReadFull()); assertEquals(false, rangeHeader.isInValidRange()); //Last n bytes - rangeHeader = S3utils.parseRangeHeader("bytes=-6", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-6", 10); assertEquals(4, rangeHeader.getStartOffset()); assertEquals(9, rangeHeader.getEndOffset()); assertEquals(false, rangeHeader.isReadFull()); assertEquals(false, rangeHeader.isInValidRange()); - rangeHeader = S3utils.parseRangeHeader("bytes=-106", 10); + rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-106", 10); assertEquals(0, rangeHeader.getStartOffset()); assertEquals(9, rangeHeader.getEndOffset()); assertEquals(false, rangeHeader.isInValidRange());