HDFS-13258. Ozone: restructure Hdsl/Ozone code to separated maven subprojects.

Contributed by Elek Marton, Mukul Kumar Singh, Xiaoyu Yao, Ajay Kumar, Anu Engineer, Lokesh Jain, Nanda Kumar.
This commit is contained in:
Anu Engineer 2018-03-15 09:23:33 -07:00 committed by Owen O'Malley
parent 5e7164c614
commit 4e61bc431e
715 changed files with 7579 additions and 3729 deletions

56
hadoop-cblock/pom.xml Normal file
View File

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project-dist</artifactId>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../hadoop-project-dist</relativePath>
</parent>
<artifactId>hadoop-cblock</artifactId>
<version>3.2.0-SNAPSHOT</version>
<description>Apache Hadoop Cblock parent project</description>
<name>Apache Hadoop Cblock</name>
<packaging>pom</packaging>
<modules>
<module>server</module>
<module>tools</module>
</modules>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile combine.self="override"></excludeFilterFile>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,21 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
<Match>
<Package name="org.apache.hadoop.cblock.protocol.proto"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,169 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cblock</artifactId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>hadoop-cblock-server</artifactId>
<version>3.2.0-SNAPSHOT</version>
<description>Apache Hadoop CBlock Server</description>
<name>Apache Hadoop CBlock Server</name>
<packaging>jar</packaging>
<properties>
<hadoop.component>cblock</hadoop.component>
<is.hadoop.component>true</is.hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-server-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-ozone-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-ozone-integration-test</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.jscsi</groupId>
<artifactId>target</artifactId>
<version>2.6.0</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>1.0.0-beta1</version>
<exclusions>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/dynamicprovisioner/expected1-pv.json</exclude>
<exclude>src/test/resources/dynamicprovisioner/input1-pvc.json</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>
${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto
</param>
<param>
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/
</param>
<param>
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/
</param>
<param>
${basedir}/../../hadoop-hdsl/common/src/main/proto/
</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>CBlockClientServerProtocol.proto</include>
<include>CBlockServiceProtocol.proto</include>
</includes>
</source>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -23,6 +23,7 @@
* This class contains constants for configuration keys used in CBlock.
*/
public final class CBlockConfigKeys {
public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
"dfs.cblock.servicerpc-address";
public static final int DFS_CBLOCK_SERVICERPC_PORT_DEFAULT =

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.cblock.protocolPB
.CBlockServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager;
@ -46,12 +45,17 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.utils.LevelDBStore;
import static org.apache.hadoop.cblock.CblockUtils.getCblockServerRpcAddr;
import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr;
import static org.apache.hadoop.ozone.web.util.ServerUtils
.updateRPCListenAddress;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -148,7 +152,7 @@ public CBlockManager(OzoneConfiguration conf,
ProtobufRpcEngine.class);
// start service for client command-to-cblock server service
InetSocketAddress serviceRpcAddr =
OzoneClientUtils.getCblockServiceRpcAddr(conf);
getCblockServiceRpcAddr(conf);
BlockingService cblockProto =
CBlockServiceProtocolProtos
.CBlockServiceProtocolService
@ -161,14 +165,14 @@ public CBlockManager(OzoneConfiguration conf,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
InetSocketAddress cblockServiceRpcAddress =
OzoneClientUtils.updateRPCListenAddress(conf,
updateRPCListenAddress(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, serviceRpcAddr, cblockService);
LOG.info("CBlock manager listening for client commands on: {}",
cblockServiceRpcAddress);
// now start service for cblock client-to-cblock server communication
InetSocketAddress serverRpcAddr =
OzoneClientUtils.getCblockServerRpcAddr(conf);
getCblockServerRpcAddr(conf);
BlockingService serverProto =
CBlockClientServerProtocolProtos
.CBlockClientServerProtocolService
@ -182,7 +186,7 @@ public CBlockManager(OzoneConfiguration conf,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_KEY,
DFS_CBLOCK_SERVICERPC_HANDLER_COUNT_DEFAULT);
InetSocketAddress cblockServerRpcAddress =
OzoneClientUtils.updateRPCListenAddress(conf,
updateRPCListenAddress(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer);
LOG.info("CBlock server listening for client commands on: {}",
cblockServerRpcAddress);
@ -389,6 +393,7 @@ public synchronized List<VolumeInfo> listVolume(String userName)
public static void main(String[] args) throws Exception {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
CblockUtils.activateConfigs();
OzoneConfiguration ozoneConf = new OzoneConfiguration();
String scmAddress = ozoneConf.get(DFS_CBLOCK_SCM_IPADDRESS_KEY,
DFS_CBLOCK_SCM_IPADDRESS_DEFAULT);

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.cblock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import com.google.common.base.Optional;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
import static org.apache.hadoop.hdsl.HdslUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdsl.HdslUtils.getPortNumberFromConfigKeys;
/**
* Generic stateless utility functions for CBlock components.
*/
public class CblockUtils {
private CblockUtils() {
}
/**
* Retrieve the socket address that is used by CBlock Service.
*
* @param conf
* @return Target InetSocketAddress for the CBlock Service endpoint.
*/
public static InetSocketAddress getCblockServiceRpcAddr(Configuration conf) {
final Optional<String> host =
getHostNameFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port =
getPortNumberFromConfigKeys(conf, DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port
.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by CBlock Server.
*
* @param conf
* @return Target InetSocketAddress for the CBlock Server endpoint.
*/
public static InetSocketAddress getCblockServerRpcAddr(Configuration conf) {
final Optional<String> host =
getHostNameFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port =
getPortNumberFromConfigKeys(conf, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" + port
.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
}
/**
* Parse size with size prefix string and return in bytes.
*
*/
public static long parseSize(String volumeSizeArgs) throws IOException {
long multiplier = 1;
Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)");
Matcher m = p.matcher(volumeSizeArgs);
if (!m.find()) {
throw new IOException("Invalid volume size args " + volumeSizeArgs);
}
int size = Integer.parseInt(m.group(1));
String s = m.group(2);
if (s.equalsIgnoreCase("MB") ||
s.equalsIgnoreCase("Mi")) {
multiplier = 1024L * 1024;
} else if (s.equalsIgnoreCase("GB") ||
s.equalsIgnoreCase("Gi")) {
multiplier = 1024L * 1024 * 1024;
} else if (s.equalsIgnoreCase("TB") ||
s.equalsIgnoreCase("Ti")) {
multiplier = 1024L * 1024 * 1024 * 1024;
} else {
throw new IOException("Invalid volume size args " + volumeSizeArgs);
}
return size * multiplier;
}
public static void activateConfigs(){
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("ozone-default.xml");
Configuration.addDefaultResource("ozone-site.xml");
Configuration.addDefaultResource("cblock-default.xml");
Configuration.addDefaultResource("cblock-site.xml");
}
}

View File

@ -17,21 +17,22 @@
*/
package org.apache.hadoop.cblock.client;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.cblock.CblockUtils.getCblockServiceRpcAddr;
/**
* Implementation of client used by CBlock command line tool.
*/
@ -45,7 +46,7 @@ public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
public CBlockVolumeClient(OzoneConfiguration conf,
InetSocketAddress serverAddress) throws IOException {
InetSocketAddress address = serverAddress != null ? serverAddress :
OzoneClientUtils.getCblockServiceRpcAddr(conf);
getCblockServiceRpcAddr(conf);
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
int rpcTimeout = Math.toIntExact(
conf.getTimeDuration(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT,

View File

@ -20,7 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.util.KeyUtil;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.scm.XceiverClientManager;
import org.jscsi.target.Configuration;
import org.jscsi.target.Target;

View File

@ -23,13 +23,15 @@
.DFS_CBLOCK_ISCSI_ADVERTISED_PORT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT;
import org.apache.hadoop.cblock.CblockUtils;
import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.client.ContainerOperationClient;
import org.apache.hadoop.security.UserGroupInformation;
@ -59,6 +61,7 @@
*/
public final class SCSITargetDaemon {
public static void main(String[] args) throws Exception {
CblockUtils.activateConfigs();
OzoneConfiguration ozoneConf = new OzoneConfiguration();
RPC.setProtocolEngine(ozoneConf, CBlockClientServerProtocolPB.class,

View File

@ -22,7 +22,7 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;

View File

@ -33,11 +33,12 @@
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import okio.Buffer;
import org.apache.hadoop.cblock.cli.CBlockCli;
import org.apache.hadoop.cblock.CblockUtils;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.storage.StorageManager;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -185,7 +186,7 @@ public void run() {
String volumeName = createVolumeName(claim);
long size = CBlockCli.parseSize(
long size = CblockUtils.parseSize(
claim.getSpec().getResources().getRequests().get("storage"));
createCBlock(volumeName, size);

View File

@ -25,8 +25,8 @@
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.util.KeyUtil;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
@ -187,8 +187,8 @@ public void run() {
ContainerDescriptor container = null;
try {
Pipeline pipeline = storageClient.createContainer(
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
HdslProtos.ReplicationType.STAND_ALONE,
HdslProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(volume.getUserName(),
volume.getVolumeName(), containerIdx), cblockId);

View File

@ -27,7 +27,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.cblock;
import "Ozone.proto";
import "hdsl.proto";
import "CBlockServiceProtocol.proto";
/**
* This message is sent from CBlock client side to CBlock server to
@ -69,7 +69,7 @@ message ContainerIDProto {
required string containerID = 1;
required uint64 index = 2;
// making pipeline optional to be compatible with exisiting tests
optional hadoop.hdfs.ozone.Pipeline pipeline = 3;
optional hadoop.hdsl.Pipeline pipeline = 3;
}

View File

@ -0,0 +1,347 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Do not modify this file directly. Instead, copy entries that you -->
<!-- wish to modify from this file into ozone-site.xml and change them -->
<!-- there. If ozone-site.xml does not already exist, create it. -->
<!--Tags supported are OZONE, CBLOCK, MANAGEMENT, SECURITY, PERFORMANCE, -->
<!--DEBUG, CLIENT, SERVER, KSM, SCM, CRITICAL, RATIS, CONTAINER, REQUIRED, -->
<!--REST, STORAGE, PIPELINE, STANDALONE -->
<configuration>
<!--CBlock Settings-->
<property>
<name>dfs.cblock.block.buffer.flush.interval</name>
<value>60s</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Controls the frequency at this the local cache flushes the
blocks to the remote containers.
</description>
</property>
<property>
<name>dfs.cblock.cache.block.buffer.size</name>
<value>512</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Size of the local cache for blocks. So cache size will be block
size multiplied by this number.
</description>
</property>
<property>
<name>dfs.cblock.cache.core.min.pool.size</name>
<value>16</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
A minimum number of threads in the pool that cBlock cache will
use for the background I/O to remote containers.
</description>
</property>
<property>
<name>dfs.cblock.cache.max.pool.size</name>
<value>256</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Maximum number of threads in the pool that cBlock cache will
use for background I/O to remote containers.
</description>
</property>
<property>
<name>dfs.cblock.cache.keep.alive</name>
<value>60s</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
If the cblock cache has no I/O, then the threads in the cache
pool are kept idle for this amount of time before shutting down.
</description>
</property>
<property>
<name>dfs.cblock.cache.leveldb.cache.size.mb</name>
<value>256</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
The amount of physical memory allocated to the local cache. The
SCSI driver will allocate this much RAM cache instances.
</description>
</property>
<property>
<name>dfs.cblock.cache.max.retry</name>
<value>65536</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
If the local cache is enabled then, CBlock writes to the local
cache when I/O happens. Then the background I/O threads write this
block to the remote containers. This value controls how many times the
background thread should attempt to do I/O to the remote containers
before giving up.
</description>
</property>
<property>
<name>dfs.cblock.cache.queue.size.in.kb</name>
<value>256</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Size of the in memory cache queue, that is flushed to local
disk.
</description>
</property>
<property>
<name>dfs.cblock.cache.thread.priority</name>
<value>5</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Priority of cache flusher thread, affecting the relative performance of
write and read. Supported values are 1, 5, 10.
Use 10 for high priority and 1 for low priority.
</description>
</property>
<property>
<name>dfs.cblock.container.size.gb</name>
<value>5</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The size of ozone container in the number of GBs. Note that
this is not setting container size for ozone. This setting is
instructing CBlock to manage containers at a standard size.
</description>
</property>
<property>
<name>dfs.cblock.disk.cache.path</name>
<value>${hadoop.tmp.dir}/cblockCacheDB</value>
<tag>CBLOCK, REQUIRED</tag>
<description>
The default path for the cblock local cache. If the cblock
local cache is enabled, then it must be set to a valid path. This cache
*should* be mapped to the fastest disk on a given machine, For example,
an SSD drive would be a good idea. Currently, all mounted disk on a
data node is mapped to a single path, so having a large number of IOPS
is essential.
</description>
</property>
<property>
<name>dfs.cblock.jscsi-address</name>
<value/>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The address that cblock will be bind to, should be a host:port
format, This setting is required for cblock server to start.
This address to be used by jscsi to mount volume.
</description>
</property>
<property>
<name>dfs.cblock.jscsi.cblock.server.address</name>
<value>127.0.0.1</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The address local jscsi server will use to talk to cblock manager.
</description>
</property>
<property>
<name>dfs.cblock.jscsi.port</name>
<value>9811</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The port on CBlockManager node for jSCSI to talk to.
</description>
</property>
<property>
<name>dfs.cblock.jscsi.rpc-bind-host</name>
<value>0.0.0.0</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The actual address the cblock jscsi rpc server will bind to. If
this optional address is set, it overrides only the hostname portion of
dfs.cblock.jscsi-address.
</description>
</property>
<property>
<name>dfs.cblock.jscsi.server.address</name>
<value>0.0.0.0</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The address that jscsi server will be running, it is nice have one
local jscsi server for each client(Linux JSCSI client) that tries to
mount cblock.
</description>
</property>
<property>
<name>dfs.cblock.manager.pool.size</name>
<value>16</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Number of active threads that cblock manager will use for container
operations. The maximum number of the threads are limited to the
processor count * 2.
</description>
</property>
<property>
<name>dfs.cblock.rpc.timeout</name>
<value>300s</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
RPC timeout used for cblock CLI operations. When you
create very large disks, like 5TB, etc. The number of containers
allocated in the system is huge. It is will 5TB/5GB, which is 1000
containers. The client CLI might timeout even though the cblock manager
creates the specified disk. This value allows the user to wait for a
longer period.
</description>
</property>
<property>
<name>dfs.cblock.scm.ipaddress</name>
<value>127.0.0.1</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
IP address used by cblock to connect to SCM.
</description>
</property>
<property>
<name>dfs.cblock.scm.port</name>
<value>9860</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
Port used by cblock to connect to SCM.
</description>
</property>
<property>
<name>dfs.cblock.service.handler.count</name>
<value>10</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
Default number of handlers for CBlock service rpc.
</description>
</property>
<property>
<name>dfs.cblock.service.leveldb.path</name>
<value>${hadoop.tmp.dir}/cblock_server.dat</value>
<tag>CBLOCK, REQUIRED</tag>
<description>
Default path for the cblock meta data store.
</description>
</property>
<property>
<name>dfs.cblock.service.rpc-bind-host</name>
<value>0.0.0.0</value>
<tag>CBLOCK, MANAGEMENT</tag>
<description>
The actual address the cblock service RPC server will bind to.
If the optional address is set, it overrides only the hostname portion of
dfs.cblock.servicerpc-address.
</description>
</property>
<property>
<name>dfs.cblock.servicerpc-address</name>
<value/>
<tag>CBLOCK, MANAGEMENT, REQUIRED</tag>
<description>
The address that cblock will be bind to, should be a host:port
format, this setting is required for cblock server to start.
This address is used for cblock management operations like create, delete,
info and list volumes
</description>
</property>
<property>
<name>dfs.cblock.short.circuit.io</name>
<value>false</value>
<tag>CBLOCK, PERFORMANCE</tag>
<description>
Enables use of the local cache in cblock. Enabling this allows
I/O against the local cache and background threads do actual I/O against
the
containers.
</description>
</property>
<property>
<name>dfs.cblock.trace.io</name>
<value>false</value>
<tag>CBLOCK, DEBUG</tag>
<description>Default flag for enabling trace io, Trace I/O logs all I/O with
hashes of
data. This is useful for detecting things like data corruption.
</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.ip</name>
<value>0.0.0.0</value>
<tag>CBLOCK</tag>
<description>
IP address returned during the iscsi discovery.
</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.port</name>
<value>3260</value>
<tag>CBLOCK</tag>
<description>
TCP port returned during the iscsi discovery.
</description>
</property>
<property>
<name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name>
<value>false</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Flag to enable automatic creation of cblocks and
kubernetes PersitentVolumes in kubernetes environment.
</description>
</property>
<property>
<name>dfs.cblock.kubernetes.cblock-user</name>
<value>iqn.2001-04.org.apache.hadoop</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>CBlock user to use for the dynamic provisioner.
This user will own all of the auto-created cblocks.
</description>
</property>
<property>
<name>dfs.cblock.kubernetes.configfile</name>
<value></value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Location of the kubernetes configuration file
to access the kubernetes cluster. Not required inside a pod
as the default service account will be if this value is
empty.
</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.ip</name>
<value></value>
<tag>CBLOCK, KUBERNETES</tag>
<description>IP where the cblock target server is available
from the kubernetes nodes. Usually it's a cluster ip address
which is defined by a deployed Service.
</description>
</property>
<property>
<name>dfs.cblock.iscsi.advertised.port</name>
<value>3260</value>
<tag>CBLOCK, KUBERNETES</tag>
<description>Port where the cblock target server is available
from the kubernetes nodes. Could be different from the
listening port if jscsi is behind a Service.
</description>
</property>
</configuration>

View File

@ -23,16 +23,16 @@
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.cblock;
import org.apache.hadoop.conf.TestConfigurationFieldsBase;
/**
* Tests if configuration constants documented in ozone-defaults.xml.
*/
public class TestCBlockConfigurationFields extends TestConfigurationFieldsBase {
@Override
public void initializeMemberVariables() {
xmlFilename = new String("cblock-default.xml");
configurationClasses =
new Class[] {CBlockConfigKeys.class};
errorIfMissingConfigProps = true;
errorIfMissingXmlProps = true;
}
}

View File

@ -24,14 +24,14 @@
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;

View File

@ -19,9 +19,9 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.cblock.util.MockStorageClient;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.cblock;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.cblock.util.MockStorageClient;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.junit.Test;
import java.io.File;

View File

@ -25,10 +25,10 @@
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;

View File

@ -23,13 +23,14 @@
import io.kubernetes.client.models.V1PersistentVolumeClaim;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_ISCSI_ADVERTISED_IP;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
/**
* Test the resource generation of Dynamic Provisioner.
*/

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.cblock.util;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -88,7 +88,7 @@ public List<ContainerInfo> listContainer(String startName,
ContainerInfo container = new ContainerInfo.Builder()
.setContainerName(containerDescriptor.getContainerID())
.setPipeline(containerDescriptor.getPipeline())
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
.setState(HdslProtos.LifeCycleState.ALLOCATED)
.build();
containerList.add(container);
return containerList;
@ -134,8 +134,8 @@ public long getContainerSize(Pipeline pipeline) throws IOException {
}
@Override
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId,
public Pipeline createContainer(HdslProtos.ReplicationType type,
HdslProtos.ReplicationFactor replicationFactor, String containerId,
String owner) throws IOException {
int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(contId));
@ -153,8 +153,8 @@ public Pipeline createContainer(OzoneProtos.ReplicationType type,
* @throws IOException
*/
@Override
public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName)
public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState>
nodeStatuses, HdslProtos.QueryScope queryScope, String poolName)
throws IOException {
return null;
}
@ -168,8 +168,8 @@ public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
* @throws IOException
*/
@Override
public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type,
HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool)
throws IOException {
return null;
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cblock</artifactId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>hadoop-cblock-tools</artifactId>
<version>3.2.0-SNAPSHOT</version>
<description>Apache Hadoop CBlock Tools</description>
<name>Apache Hadoop CBlock Tools</name>
<packaging>jar</packaging>
<properties>
<hadoop.component>cblock</hadoop.component>
<is.hadoop.component>true</is.hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cblock-server</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -24,15 +24,18 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.cblock.CblockUtils;
import org.apache.hadoop.cblock.client.CBlockVolumeClient;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +44,6 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* The command line tool class.
@ -194,6 +195,7 @@ public int run(String[] args) throws ParseException, IOException {
}
public static void main(String[] argv) throws Exception {
CblockUtils.activateConfigs();
OzoneConfiguration cblockConf = new OzoneConfiguration();
RPC.setProtocolEngine(cblockConf, CBlockServiceProtocolPB.class,
ProtobufRpcEngine.class);
@ -208,38 +210,12 @@ public static void main(String[] argv) throws Exception {
System.exit(res);
}
public static long parseSize(String volumeSizeArgs) throws IOException {
long multiplier = 1;
Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)");
Matcher m = p.matcher(volumeSizeArgs);
if (!m.find()) {
throw new IOException("Invalid volume size args " + volumeSizeArgs);
}
int size = Integer.parseInt(m.group(1));
String s = m.group(2);
if (s.equalsIgnoreCase("MB") ||
s.equalsIgnoreCase("Mi")) {
multiplier = 1024L * 1024;
} else if (s.equalsIgnoreCase("GB") ||
s.equalsIgnoreCase("Gi")) {
multiplier = 1024L * 1024 * 1024;
} else if (s.equalsIgnoreCase("TB") ||
s.equalsIgnoreCase("Ti")) {
multiplier = 1024L * 1024 * 1024 * 1024;
} else {
throw new IOException("Invalid volume size args " + volumeSizeArgs);
}
return size * multiplier;
}
private void createVolume(String[] createArgs) throws IOException {
String userName = createArgs[0];
String volumeName = createArgs[1];
long volumeSize = parseSize(createArgs[2]);
long volumeSize = CblockUtils.parseSize(createArgs[2]);
int blockSize = Integer.parseInt(createArgs[3])*1024;
localProxy.createVolume(userName, volumeName, volumeSize, blockSize);
}

View File

@ -596,6 +596,13 @@ function hadoop_bootstrap
YARN_LIB_JARS_DIR=${YARN_LIB_JARS_DIR:-"share/hadoop/yarn/lib"}
MAPRED_DIR=${MAPRED_DIR:-"share/hadoop/mapreduce"}
MAPRED_LIB_JARS_DIR=${MAPRED_LIB_JARS_DIR:-"share/hadoop/mapreduce/lib"}
HDSL_DIR=${HDSL_DIR:-"share/hadoop/hdsl"}
HDSL_LIB_JARS_DIR=${HDSL_LIB_JARS_DIR:-"share/hadoop/hdsl/lib"}
OZONE_DIR=${OZONE_DIR:-"share/hadoop/ozone"}
OZONE_LIB_JARS_DIR=${OZONE_LIB_JARS_DIR:-"share/hadoop/ozone/lib"}
CBLOCK_DIR=${CBLOCK_DIR:-"share/hadoop/cblock"}
CBLOCK_LIB_JARS_DIR=${CBLOCK_LIB_JARS_DIR:-"share/hadoop/cblock/lib"}
HADOOP_TOOLS_HOME=${HADOOP_TOOLS_HOME:-${HADOOP_HOME}}
HADOOP_TOOLS_DIR=${HADOOP_TOOLS_DIR:-"share/hadoop/tools"}
HADOOP_TOOLS_LIB_JARS_DIR=${HADOOP_TOOLS_LIB_JARS_DIR:-"${HADOOP_TOOLS_DIR}/lib"}

View File

@ -68,6 +68,44 @@
<artifactId>hadoop-client-integration-tests</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-ozone-ozone-manager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-server-scm</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-tools</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cblock-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-container-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-ozone-objectstore-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-tools</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-ozone-tools</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cblock-tools</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -17,7 +17,7 @@
version: "3"
services:
namenode:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
hostname: namenode
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
@ -29,36 +29,38 @@ services:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","namenode"]
datanode:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
ports:
- 9864
command: ["/opt/hadoop/bin/hdfs","datanode"]
command: ["/opt/hadoop/bin/oz","datanode"]
env_file:
- ./docker-config
jscsi:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
ports:
- 3260:3260
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
env_file:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","jscsi"]
command: ["/opt/hadoop/bin/oz","jscsi"]
cblock:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
env_file:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","cblockserver"]
command: ["/opt/hadoop/bin/oz","cblockserver"]
scm:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
ports:
- 9876:9876
env_file:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","scm"]
command: ["/opt/hadoop/bin/oz","scm"]
environment:
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION

View File

@ -27,7 +27,7 @@ OZONE-SITE.XML_ozone.scm.client.address=scm
OZONE-SITE.XML_dfs.cblock.jscsi.cblock.server.address=cblock
OZONE-SITE.XML_dfs.cblock.scm.ipaddress=scm
OZONE-SITE.XML_dfs.cblock.service.leveldb.path=/tmp
HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.HdslServerPlugin,org.apache.hadoop.ozone.web.ObjectStoreRestPlugin
HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000
HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode
HDFS-SITE.XML_rpc.metrics.quantile.enable=true

View File

@ -17,7 +17,7 @@
version: "3"
services:
namenode:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
hostname: namenode
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
@ -29,14 +29,16 @@ services:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","namenode"]
datanode:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
ports:
- 9864
command: ["/opt/hadoop/bin/hdfs","datanode"]
command: ["/opt/hadoop/bin/oz","datanode"]
env_file:
- ./docker-config
ksm:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
ports:
@ -45,9 +47,9 @@ services:
ENSURE_KSM_INITIALIZED: /data/metadata/ksm/current/VERSION
env_file:
- ./docker-config
command: ["/opt/hadoop/bin/hdfs","ksm"]
command: ["/opt/hadoop/bin/oz","ksm"]
scm:
image: elek/hadoop-runner:latest
image: elek/hadoop-runner:o3-refactor
volumes:
- ../..//hadoop-${VERSION}:/opt/hadoop
ports:
@ -56,4 +58,4 @@ services:
- ./docker-config
environment:
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
command: ["/opt/hadoop/bin/hdfs","scm"]
command: ["/opt/hadoop/bin/oz","scm"]

View File

@ -27,6 +27,7 @@ HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000
HDFS-SITE.XML_dfs.namenode.name.dir=/data/namenode
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
HDFS-SITE.XML_dfs.datanode.plugins=org.apache.hadoop.ozone.HdslServerPlugin,org.apache.hadoop.ozone.web.ObjectStoreRestPlugin
LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender
LOG4J.PROPERTIES_log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

View File

@ -113,25 +113,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<artifactId>ratis-server</artifactId>
<groupId>org.apache.ratis</groupId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>ratis-netty</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
</dependencies>
<build>

View File

@ -336,7 +336,9 @@ public static DatanodeIDProto convert(DatanodeID dn) {
dn.getDatanodeUuid() : "")
.setInfoPort(dn.getInfoPort())
.setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
.setIpcPort(dn.getIpcPort())
.setOzoneRestPort(dn.getOzoneRestPort())
.build();
}
public static DatanodeInfoProto.AdminState convert(
@ -786,9 +788,13 @@ public static Token<BlockTokenIdentifier> convert(
// DatanodeId
public static DatanodeID convert(DatanodeIDProto dn) {
return new DatanodeID(dn.getIpAddr(), dn.getHostName(),
dn.getDatanodeUuid(), dn.getXferPort(), dn.getInfoPort(),
dn.hasInfoSecurePort() ? dn.getInfoSecurePort() : 0, dn.getIpcPort());
DatanodeID datanodeID =
new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(),
dn.getXferPort(), dn.getInfoPort(),
dn.hasInfoSecurePort() ? dn.getInfoSecurePort() : 0,
dn.getIpcPort());
datanodeID.setOzoneRestPort(dn.getOzoneRestPort());
return datanodeID;
}
public static AdminStates convert(AdminState adminState) {

View File

@ -1,874 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_JSCSI_PORT_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_BIND_HOST_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_PORT_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL;
/**
* Utility methods for Ozone and Container Clients.
*
* The methods to retrieve SCM service endpoints assume there is a single
* SCM service instance. This will change when we switch to replicated service
* instances for redundancy.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class OzoneClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(
OzoneClientUtils.class);
private static final int NO_PORT = -1;
/**
* Date format that used in ozone. Here the format is thread safe to use.
*/
private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT =
ThreadLocal.withInitial(() -> {
DateTimeFormatter format =
DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT);
return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE));
});
/**
* The service ID of the solitary Ozone SCM service.
*/
public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService";
public static final String OZONE_SCM_SERVICE_INSTANCE_ID =
"OzoneScmServiceInstance";
private OzoneClientUtils() {
// Never constructed
}
/**
* Retrieve the socket addresses of all storage container managers.
*
* @param conf
* @return A collection of SCM addresses
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddresses(
Configuration conf) throws IllegalArgumentException {
Collection<InetSocketAddress> addresses =
new HashSet<InetSocketAddress>();
Collection<String> names =
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
if (names == null || names.isEmpty()) {
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+ " need to be a set of valid DNS names or IP addresses."
+ " Null or empty address list found.");
}
final com.google.common.base.Optional<Integer>
defaultPort = com.google.common.base.Optional.of(ScmConfigKeys
.OZONE_SCM_DEFAULT_PORT);
for (String address : names) {
com.google.common.base.Optional<String> hostname =
OzoneClientUtils.getHostName(address);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for SCM: "
+ hostname);
}
com.google.common.base.Optional<Integer> port =
OzoneClientUtils.getHostPort(address);
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
port.or(defaultPort.get()));
addresses.add(addr);
}
return addresses;
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
*/
public static InetSocketAddress getScmAddressForClients(Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
" on configuring Ozone.");
}
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(host.get() + ":" +
port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM for block service. If
* {@link ScmConfigKeys#OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY} is not defined
* then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined.
*/
public static InetSocketAddress getScmAddressForBlockClients(
Configuration conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration" +
" for details on configuring Ozone.");
}
}
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(host.get() + ":" +
port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmAddressForDataNodes(
Configuration conf) {
// We try the following settings in decreasing priority to retrieve the
// target host.
// - OZONE_SCM_DATANODE_ADDRESS_KEY
// - OZONE_SCM_CLIENT_ADDRESS_KEY
//
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for details" +
" on configuring Ozone.");
}
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" +
port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
return addr;
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
*/
public static InetSocketAddress getScmClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM Block service.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
*/
public static InetSocketAddress getScmBlockClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmDataNodeBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by KSM.
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getKsmAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_KSM_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_KSM_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(OZONE_KSM_BIND_HOST_DEFAULT) + ":" +
port.or(OZONE_KSM_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to KSM.
* @param conf
* @return Target InetSocketAddress for the KSM service endpoint.
*/
public static InetSocketAddress getKsmAddressForClients(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_KSM_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
OZONE_KSM_ADDRESS_KEY + " must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for" +
" details on configuring Ozone.");
}
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_KSM_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.get() + ":" + port.or(OZONE_KSM_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by CBlock Service.
* @param conf
* @return Target InetSocketAddress for the CBlock Service endpoint.
*/
public static InetSocketAddress getCblockServiceRpcAddr(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
DFS_CBLOCK_SERVICERPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
port.or(DFS_CBLOCK_SERVICERPC_PORT_DEFAULT));
}
/**
* Retrieve the socket address that is used by CBlock Server.
* @param conf
* @return Target InetSocketAddress for the CBlock Server endpoint.
*/
public static InetSocketAddress getCblockServerRpcAddr(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.or(DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT) + ":" +
port.or(DFS_CBLOCK_JSCSI_PORT_DEFAULT));
}
/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf - Conf
* @param keys a list of configuration key names.
*
* @return first hostname component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static Optional<String> getHostNameFromConfigKeys(Configuration conf,
String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final Optional<String> hostName = getHostName(value);
if (hostName.isPresent()) {
return hostName;
}
}
return Optional.absent();
}
/**
* Gets the hostname or Indicates that it is absent.
* @param value host or host:port
* @return hostname
*/
public static Optional<String> getHostName(String value) {
if ((value == null) || value.isEmpty()) {
return Optional.absent();
}
return Optional.of(HostAndPort.fromString(value).getHostText());
}
/**
* Gets the port if there is one, throws otherwise.
* @param value String in host:port format.
* @return Port
*/
public static Optional<Integer> getHostPort(String value) {
if((value == null) || value.isEmpty()) {
return Optional.absent();
}
int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
if (port == NO_PORT) {
return Optional.absent();
} else {
return Optional.of(port);
}
}
/**
* Returns the cache value to be used for list calls.
* @param conf Configuration object
* @return list cache size
*/
public static int getListCacheSize(Configuration conf) {
return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
}
/**
* Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf Conf
* @param keys a list of configuration key names.
*
* @return first port number component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static Optional<Integer> getPortNumberFromConfigKeys(
Configuration conf, String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final Optional<Integer> hostPort = getHostPort(value);
if (hostPort.isPresent()) {
return hostPort;
}
}
return Optional.absent();
}
/**
* Return the list of service addresses for the Ozone SCM. This method is used
* by the DataNodes to determine the service instances to connect to.
*
* @param conf
* @return list of SCM service addresses.
*/
public static Map<String, ? extends Map<String, InetSocketAddress>>
getScmServiceRpcAddresses(Configuration conf) {
final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
getScmAddressForDataNodes(conf));
final Map<String, Map<String, InetSocketAddress>> services =
new HashMap<>();
services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
return services;
}
/**
* Checks that a given value is with a range.
*
* For example, sanitizeUserArgs(17, 3, 5, 10)
* ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10.
*
* @param valueTocheck - value to check
* @param baseValue - the base value that is being used.
* @param minFactor - range min - a 2 here makes us ensure that value
* valueTocheck is at least twice the baseValue.
* @param maxFactor - range max
* @return long
*/
private static long sanitizeUserArgs(long valueTocheck, long baseValue,
long minFactor, long maxFactor)
throws IllegalArgumentException {
if ((valueTocheck >= (baseValue * minFactor)) &&
(valueTocheck <= (baseValue * maxFactor))) {
return valueTocheck;
}
String errMsg = String.format("%d is not within min = %d or max = " +
"%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor);
throw new IllegalArgumentException(errMsg);
}
/**
* Returns the interval in which the heartbeat processor thread runs.
*
* @param conf - Configuration
* @return long in Milliseconds.
*/
public static long getScmheartbeatCheckerInterval(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
/**
* Heartbeat Interval - Defines the heartbeat frequency from a datanode to
* SCM.
*
* @param conf - Ozone Config
* @return - HB interval in seconds.
*/
public static long getScmHeartbeatInterval(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL,
ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
}
/**
* Get the Stale Node interval, which is used by SCM to flag a datanode as
* stale, if the heartbeat from that node has been missing for this duration.
*
* @param conf - Configuration.
* @return - Long, Milliseconds to wait before flagging a node as stale.
*/
public static long getStaleNodeInterval(Configuration conf) {
long staleNodeIntervalMs =
conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
// Make sure that StaleNodeInterval is configured way above the frequency
// at which we run the heartbeat thread.
//
// Here we check that staleNodeInterval is at least five times more than the
// frequency at which the accounting thread is going to run.
try {
sanitizeUserArgs(staleNodeIntervalMs, heartbeatThreadFrequencyMs,
5, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Stale Node Interval is cannot be honored due to " +
"mis-configured {}. ex: {}",
OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, ex);
throw ex;
}
// Make sure that stale node value is greater than configured value that
// datanodes are going to send HBs.
try {
sanitizeUserArgs(staleNodeIntervalMs, heartbeatIntervalMs, 3, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Stale Node Interval MS is cannot be honored due to " +
"mis-configured {}. ex: {}", OZONE_SCM_HEARTBEAT_INTERVAL, ex);
throw ex;
}
return staleNodeIntervalMs;
}
/**
* Gets the interval for dead node flagging. This has to be a value that is
* greater than stale node value, and by transitive relation we also know
* that this value is greater than heartbeat interval and heartbeatProcess
* Interval.
*
* @param conf - Configuration.
* @return - the interval for dead node flagging.
*/
public static long getDeadNodeInterval(Configuration conf) {
long staleNodeIntervalMs = getStaleNodeInterval(conf);
long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
OZONE_SCM_DEADNODE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
try {
// Make sure that dead nodes Ms is at least twice the time for staleNodes
// with a max of 1000 times the staleNodes.
sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Dead Node Interval MS is cannot be honored due to " +
"mis-configured {}. ex: {}", OZONE_SCM_STALENODE_INTERVAL, ex);
throw ex;
}
return deadNodeIntervalMs;
}
/**
* Returns the maximum number of heartbeat to process per loop of the process
* thread.
* @param conf Configuration
* @return - int -- Number of HBs to process
*/
public static int getMaxHBToProcessPerLoop(Configuration conf) {
return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
}
/**
* Timeout value for the RPC from Datanode to SCM, primarily used for
* Heartbeats and container reports.
*
* @param conf - Ozone Config
* @return - Rpc timeout in Milliseconds.
*/
public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Log Warn interval.
*
* @param conf - Ozone Config
* @return - Log warn interval.
*/
public static int getLogWarnInterval(Configuration conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
}
/**
* returns the Container port.
* @param conf - Conf
* @return port number.
*/
public static int getContainerPort(Configuration conf) {
return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
public static InetSocketAddress updateRPCListenAddress(
OzoneConfiguration conf, String rpcAddressKey,
InetSocketAddress addr, RPC.Server rpcServer) {
return updateListenAddress(conf, rpcAddressKey, addr,
rpcServer.getListenerAddress());
}
/**
* After starting an server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param addressKey configuration key for RPC server address
* @param addr configured address
* @param listenAddr the real listening address.
*/
public static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) {
InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(),
listenAddr.getPort());
conf.set(addressKey,
addr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}
/**
* Releases a http connection if the request is not null.
* @param request
*/
public static void releaseConnection(HttpRequestBase request) {
if (request != null) {
request.releaseConnection();
}
}
/**
* @return a default instance of {@link CloseableHttpClient}.
*/
public static CloseableHttpClient newHttpClient() {
return OzoneClientUtils.newHttpClient(new OzoneConfiguration());
}
/**
* Returns a {@link CloseableHttpClient} configured by given configuration.
* If conf is null, returns a default instance.
*
* @param conf configuration
* @return a {@link CloseableHttpClient} instance.
*/
public static CloseableHttpClient newHttpClient(Configuration conf) {
long socketTimeout = OzoneConfigKeys
.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT;
long connectionTimeout = OzoneConfigKeys
.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT;
if (conf != null) {
socketTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
connectionTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
}
CloseableHttpClient client = HttpClients.custom()
.setDefaultRequestConfig(
RequestConfig.custom()
.setSocketTimeout(Math.toIntExact(socketTimeout))
.setConnectTimeout(Math.toIntExact(connectionTimeout))
.build())
.build();
return client;
}
/**
* verifies that bucket name / volume name is a valid DNS name.
*
* @param resName Bucket or volume Name to be validated
*
* @throws IllegalArgumentException
*/
public static void verifyResourceName(String resName)
throws IllegalArgumentException {
if (resName == null) {
throw new IllegalArgumentException("Bucket or Volume name is null");
}
if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) ||
(resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) {
throw new IllegalArgumentException(
"Bucket or Volume length is illegal, " +
"valid length is 3-63 characters");
}
if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot start with a period or dash");
}
if ((resName.charAt(resName.length() - 1) == '.') ||
(resName.charAt(resName.length() - 1) == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot end with a period or dash");
}
boolean isIPv4 = true;
char prev = (char) 0;
for (int index = 0; index < resName.length(); index++) {
char currChar = resName.charAt(index);
if (currChar != '.') {
isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4;
}
if (currChar > 'A' && currChar < 'Z') {
throw new IllegalArgumentException(
"Bucket or Volume name does not support uppercase characters");
}
if ((currChar != '.') && (currChar != '-')) {
if ((currChar < '0') || (currChar > '9' && currChar < 'a') ||
(currChar > 'z')) {
throw new IllegalArgumentException("Bucket or Volume name has an " +
"unsupported character : " +
currChar);
}
}
if ((prev == '.') && (currChar == '.')) {
throw new IllegalArgumentException("Bucket or Volume name should not " +
"have two contiguous periods");
}
if ((prev == '-') && (currChar == '.')) {
throw new IllegalArgumentException(
"Bucket or Volume name should not have period after dash");
}
if ((prev == '.') && (currChar == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name should not have dash after period");
}
prev = currChar;
}
if (isIPv4) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot be an IPv4 address or all numeric");
}
}
/**
* Convert time in millisecond to a human readable format required in ozone.
* @return a human readable string for the input time
*/
public static String formatDateTime(long millis) {
ZonedDateTime dateTime = ZonedDateTime.ofInstant(
Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone());
return DATE_FORMAT.get().format(dateTime);
}
/**
* Convert time in ozone date format to millisecond.
* @return time in milliseconds
*/
public static long formatDateTime(String date) throws ParseException {
Preconditions.checkNotNull(date, "Date string should not be null.");
return ZonedDateTime.parse(date, DATE_FORMAT.get())
.toInstant().getEpochSecond();
}
/**
* Returns the maximum no of outstanding async requests to be handled by
* Standalone and Ratis client.
*/
public static int getMaxOutstandingRequests(Configuration config) {
return config
.getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
}
}

View File

@ -75,6 +75,7 @@ message DatanodeIDProto {
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
optional uint32 containerPort = 8 [default = 0]; // Ozone stand_alone protocol
optional uint32 ratisPort = 9 [default = 0]; //Ozone ratis port
optional uint32 ozoneRestPort = 10 [default = 0];
}
/**

View File

@ -46,7 +46,6 @@ function hadoop_usage
hadoop_add_subcommand "envvars" client "display computed Hadoop environment variables"
hadoop_add_subcommand "ec" admin "run a HDFS ErasureCoding CLI"
hadoop_add_subcommand "fetchdt" client "fetch a delegation token from the NameNode"
hadoop_add_subcommand "freon" client "runs an ozone data generator"
hadoop_add_subcommand "fsck" admin "run a DFS filesystem checking utility"
hadoop_add_subcommand "getconf" client "get config values from configuration"
hadoop_add_subcommand "groups" client "get the groups which users belong to"
@ -140,9 +139,6 @@ function hdfscmd_case
fetchdt)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DelegationTokenFetcher
;;
freon)
HADOOP_CLASSNAME=org.apache.hadoop.ozone.tools.Freon
;;
fsck)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DFSck
;;

View File

@ -17,10 +17,6 @@
MYNAME="${BASH_SOURCE-$0}"
## @description usage info
## @audience private
## @stability evolving
## @replaceable no
function hadoop_usage
{
hadoop_add_option "--buildpaths" "attempt to add class files from build tree"

View File

@ -17,10 +17,6 @@
MYNAME="${BASH_SOURCE-$0}"
## @description usage info
## @audience private
## @stability evolving
## @replaceable no
function hadoop_usage
{
hadoop_add_option "--buildpaths" "attempt to add class files from build tree"

View File

@ -36,8 +36,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED_DEFAULT;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@ -73,7 +71,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -1497,23 +1494,6 @@ static String getPassword(Configuration conf, String alias) {
return password;
}
public static boolean isOzoneEnabled(Configuration conf) {
String securityEnabled = conf.get(CommonConfigurationKeysPublic
.HADOOP_SECURITY_AUTHENTICATION,
"simple");
boolean securityAuthorizationEnabled = conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false);
if (securityEnabled.equals("kerberos") || securityAuthorizationEnabled) {
LOG.error("Ozone is not supported in a security enabled cluster. ");
return false;
} else {
return conf.getBoolean(OZONE_ENABLED,
OZONE_ENABLED_DEFAULT);
}
}
/**
* Converts a Date into an ISO-8601 formatted datetime string.
*/

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.ServicePlugin;
/**
* Datanode specific service plugin with additional hooks.
*/
public interface DataNodeServicePlugin extends ServicePlugin{
/**
* Extension point to modify the datanode id.
*
* @param dataNodeId
*/
default void onDatanodeIdCreation(DatanodeID dataNodeId) {
//NOOP
}
/**
* Extension point to modify the datanode id.
*
* @param dataNodeId
*/
default void onDatanodeSuccessfulNamenodeRegisration(
DatanodeRegistration dataNodeId) {
//NOOP
}
}

View File

@ -43,7 +43,7 @@
* handler drops the request and immediately sends an HTTP 400 response.
*/
@InterfaceAudience.Private
final class RestCsrfPreventionFilterHandler
public final class RestCsrfPreventionFilterHandler
extends SimpleChannelInboundHandler<HttpRequest> {
private static final Log LOG = DatanodeHttpServer.LOG;

View File

@ -1,589 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces
.ContainerLocationManager;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
/**
* A Generic ContainerManagerImpl that will be called from Ozone
* ContainerManagerImpl. This allows us to support delta changes to ozone
* version without having to rewrite the containerManager.
*/
public class ContainerManagerImpl implements ContainerManager {
static final Logger LOG =
LoggerFactory.getLogger(ContainerManagerImpl.class);
private final ConcurrentSkipListMap<String, ContainerStatus>
containerMap = new ConcurrentSkipListMap<>();
// This lock follows fair locking policy of first-come first-serve
// for waiting threads.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private ContainerLocationManager locationManager;
private ChunkManager chunkManager;
private KeyManager keyManager;
/**
* Init call that sets up a container Manager.
*
* @param config - Configuration.
* @param containerDirs - List of Metadata Container locations.
* @throws IOException
*/
@Override
public void init(
Configuration config, List<StorageLocation> containerDirs)
throws IOException {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(containerDirs);
Preconditions.checkState(containerDirs.size() > 0);
readLock();
try {
for (StorageLocation path : containerDirs) {
File directory = path.getFile();
if (!directory.isDirectory()) {
LOG.error("Invalid path to container metadata directory. path: {}",
path.toString());
throw new IOException("Invalid path to container metadata directory" +
". " + path);
}
File[] files = directory.listFiles(new ContainerFilter());
if (files != null) {
for (File containerFile : files) {
String containerPath =
ContainerUtils.getContainerNameFromFile(containerFile);
Preconditions.checkNotNull(containerPath);
readContainerInfo(containerPath);
}
}
}
List<StorageLocation> dataDirs = new LinkedList<>();
for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
StorageLocation location = StorageLocation.parse(dir);
dataDirs.add(location);
}
this.locationManager =
new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
} finally {
readUnlock();
}
}
/**
* Reads the Container Info from a file and verifies that checksum match. If
* the checksums match, then that file is added to containerMap.
*
* @param containerName - Name which points to the persisted container.
*/
private void readContainerInfo(String containerName)
throws IOException {
Preconditions.checkState(containerName.length() > 0);
FileInputStream containerStream = null;
DigestInputStream dis = null;
FileInputStream metaStream = null;
Path cPath = Paths.get(containerName).getFileName();
String keyName = null;
if (cPath != null) {
keyName = cPath.toString();
}
Preconditions.checkNotNull(keyName);
try {
String containerFileName = containerName.concat(CONTAINER_EXTENSION);
String metaFileName = containerName.concat(CONTAINER_META);
containerStream = new FileInputStream(containerFileName);
metaStream = new FileInputStream(metaFileName);
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
dis = new DigestInputStream(containerStream, sha);
ContainerData containerData = ContainerData.getFromProtBuf(
ContainerProtos.ContainerData.parseDelimitedFrom(dis));
ContainerProtos.ContainerMeta meta = ContainerProtos.ContainerMeta
.parseDelimitedFrom(metaStream);
if (meta != null && !DigestUtils.sha256Hex(sha.digest()).equals(meta
.getHash())) {
throw new IOException("Invalid SHA found for file.");
}
containerMap.put(keyName, new ContainerStatus(containerData, true));
} catch (IOException | NoSuchAlgorithmException ex) {
LOG.error("read failed for file: {} ex: {}",
containerName, ex.getMessage());
// TODO : Add this file to a recovery Queue.
// Remember that this container is busted and we cannot use it.
containerMap.put(keyName, new ContainerStatus(null, false));
} finally {
IOUtils.closeStream(dis);
IOUtils.closeStream(containerStream);
IOUtils.closeStream(metaStream);
}
}
/**
* Creates a container with the given name.
*
* @param pipeline -- Nodes which make up this container.
* @param containerData - Container Name and metadata.
* @throws IOException
*/
@Override
public void createContainer(Pipeline pipeline, ContainerData containerData)
throws IOException {
Preconditions.checkNotNull(containerData);
writeLock();
try {
if (containerMap.containsKey(containerData.getName())) {
throw new FileAlreadyExistsException("container already exists.");
}
// This is by design. We first write and close the
// container Info and metadata to a directory.
// Then read back and put that info into the containerMap.
// This allows us to make sure that our write is consistent.
writeContainerInfo(containerData);
File cFile = new File(containerData.getContainerPath());
readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
} catch (NoSuchAlgorithmException ex) {
throw new IOException("failed to create container", ex);
} finally {
writeUnlock();
}
}
/**
* Writes a container to a chosen location and updates the container Map.
*
* The file formats of ContainerData and Container Meta is the following.
*
* message ContainerData {
* required string name = 1;
* repeated KeyValue metadata = 2;
* optional string dbPath = 3;
* optional string containerPath = 4;
* }
*
* message ContainerMeta {
* required string fileName = 1;
* required string hash = 2;
* }
*
* @param containerData - container Data
*/
private void writeContainerInfo(ContainerData containerData)
throws IOException, NoSuchAlgorithmException {
Preconditions.checkNotNull(this.locationManager);
FileOutputStream containerStream = null;
DigestOutputStream dos = null;
FileOutputStream metaStream = null;
Path location = locationManager.getContainerPath();
File containerFile = ContainerUtils.getContainerFile(containerData,
location);
File metadataFile = ContainerUtils.getMetadataFile(containerData, location);
try {
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
Path metadataPath = this.locationManager.getDataPath(
containerData.getContainerName());
metadataPath = ContainerUtils.createMetadata(metadataPath);
containerStream = new FileOutputStream(containerFile);
metaStream = new FileOutputStream(metadataFile);
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
dos = new DigestOutputStream(containerStream, sha);
containerData.setDBPath(metadataPath.resolve(OzoneConsts.CONTAINER_DB)
.toString());
containerData.setContainerPath(containerFile.toString());
ContainerProtos.ContainerData protoData = containerData
.getProtoBufMessage();
protoData.writeDelimitedTo(dos);
ContainerProtos.ContainerMeta protoMeta = ContainerProtos
.ContainerMeta.newBuilder()
.setFileName(containerFile.toString())
.setHash(DigestUtils.sha256Hex(sha.digest()))
.build();
protoMeta.writeDelimitedTo(metaStream);
} catch (IOException ex) {
// TODO : we need to clean up partially constructed files
// The proper way to do would be for a thread
// to read all these 3 artifacts and make sure they are
// sane. That info needs to come from the replication
// pipeline, and if not consistent delete these file.
// In case of ozone this is *not* a deal breaker since
// SCM is guaranteed to generate unique container names.
LOG.error("creation of container failed. Name: {} ",
containerData.getContainerName());
throw ex;
} finally {
IOUtils.closeStream(dos);
IOUtils.closeStream(containerStream);
IOUtils.closeStream(metaStream);
}
}
/**
* Deletes an existing container.
*
* @param pipeline - nodes that make this container.
* @param containerName - name of the container.
* @throws IOException
*/
@Override
public void deleteContainer(Pipeline pipeline, String containerName) throws
IOException {
Preconditions.checkState(containerName.length() > 0);
writeLock();
try {
ContainerStatus status = containerMap.get(containerName);
if (status == null) {
LOG.info("No such container. Name: {}", containerName);
throw new IOException("No such container. Name : " + containerName);
}
ContainerUtils.removeContainer(status.containerData);
containerMap.remove(containerName);
} finally {
writeUnlock();
}
}
/**
* A simple interface for container Iterations.
* <p/>
* This call make no guarantees about consistency of the data between
* different list calls. It just returns the best known data at that point of
* time. It is possible that using this iteration you can miss certain
* container from the listing.
*
* @param prefix - Return keys that match this prefix.
* @param count - how many to return
* @param prevKey - Previous Key Value or empty String.
* @param data - Actual containerData
* @throws IOException
*/
@Override
public void listContainer(String prefix, long count, String prevKey,
List<ContainerData> data) throws IOException {
// TODO : Support list with Prefix and PrevKey
Preconditions.checkNotNull(data);
readLock();
try {
ConcurrentNavigableMap<String, ContainerStatus> map = null;
if (prevKey == null || prevKey.isEmpty()) {
map = containerMap.tailMap(containerMap.firstKey(), true);
} else {
map = containerMap.tailMap(prevKey, false);
}
int currentCount = 0;
for (ContainerStatus entry : map.values()) {
if (currentCount < count) {
data.add(entry.getContainer());
currentCount++;
} else {
return;
}
}
} finally {
readUnlock();
}
}
/**
* Get metadata about a specific container.
*
* @param containerName - Name of the container
* @return ContainerData - Container Data.
* @throws IOException
*/
@Override
public ContainerData readContainer(String containerName) throws IOException {
if(!containerMap.containsKey(containerName)) {
throw new IOException("Unable to find the container. Name: "
+ containerName);
}
return containerMap.get(containerName).getContainer();
}
/**
* Supports clean shutdown of container.
*
* @throws IOException
*/
@Override
public void shutdown() throws IOException {
Preconditions.checkState(this.hasWriteLock());
this.containerMap.clear();
this.locationManager.shutdown();
}
@VisibleForTesting
public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
return containerMap;
}
/**
* Acquire read lock.
*/
@Override
public void readLock() {
this.lock.readLock().lock();
}
/**
* Release read lock.
*/
@Override
public void readUnlock() {
this.lock.readLock().unlock();
}
/**
* Check if the current thread holds read lock.
*/
@Override
public boolean hasReadLock() {
return this.lock.readLock().tryLock();
}
/**
* Acquire write lock.
*/
@Override
public void writeLock() {
this.lock.writeLock().lock();
}
/**
* Acquire write lock, unless interrupted while waiting.
*/
@Override
public void writeLockInterruptibly() throws InterruptedException {
this.lock.writeLock().lockInterruptibly();
}
/**
* Release write lock.
*/
@Override
public void writeUnlock() {
this.lock.writeLock().unlock();
}
/**
* Check if the current thread holds write lock.
*/
@Override
public boolean hasWriteLock() {
return this.lock.writeLock().isHeldByCurrentThread();
}
/**
* Sets the chunk Manager.
* @param chunkManager
*/
public void setChunkManager(ChunkManager chunkManager) {
this.chunkManager = chunkManager;
}
public ChunkManager getChunkManager() {
return this.chunkManager;
}
/**
* Sets the Key Manager.
*
* @param keyManager - Key Manager.
*/
@Override
public void setKeyManager(KeyManager keyManager) {
this.keyManager = keyManager;
}
/**
* Gets the Key Manager.
*
* @return KeyManager.
*/
@Override
public KeyManager getKeyManager() {
return this.keyManager;
}
/**
* Get the node report.
* @return node report.
*/
@Override
public SCMNodeReport getNodeReport() throws IOException {
StorageLocationReport[] reports = locationManager.getLocationReport();
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
for (int i = 0; i < reports.length; i++) {
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
.setCapacity(reports[i].getCapacity())
.setScmUsed(reports[i].getScmUsed())
.setRemaining(reports[i].getRemaining())
.build());
}
return nrb.build();
}
/**
* Filter out only container files from the container metadata dir.
*/
private static class ContainerFilter implements FilenameFilter {
/**
* Tests if a specified file should be included in a file list.
*
* @param dir the directory in which the file was found.
* @param name the name of the file.
* @return <code>true</code> if and only if the name should be included in
* the file list; <code>false</code> otherwise.
*/
@Override
public boolean accept(File dir, String name) {
return name.endsWith(CONTAINER_EXTENSION);
}
}
/**
* This is an immutable class that represents the state of a container. if the
* container reading encountered an error when we boot up we will post that
* info to a recovery queue and keep the info in the containerMap.
* <p/>
* if and when the issue is fixed, the expectation is that this entry will be
* deleted by the recovery thread from the containerMap and will insert entry
* instead of modifying this class.
*/
@VisibleForTesting
static class ContainerStatus {
private final ContainerData containerData;
private final boolean active;
/**
* Creates a Container Status class.
*
* @param containerData - ContainerData.
* @param active - Active or not active.
*/
ContainerStatus(ContainerData containerData, boolean active) {
this.containerData = containerData;
this.active = active;
}
/**
* Returns container if it is active. It is not active if we have had an
* error and we are waiting for the background threads to fix the issue.
*
* @return ContainerData.
*/
public ContainerData getContainer() {
if (active) {
return containerData;
}
return null;
}
/**
* Indicates if a container is Active.
*
* @return
*/
public boolean isActive() {
return active;
}
}
}

View File

@ -18,11 +18,11 @@
(function () {
"use strict";
var data = {ozone: {enabled: false}};
var data = {};
dust.loadSource(dust.compile($('#tmpl-dn').html(), 'dn'));
function loadDatanodeInfo() {
function load() {
$.get('/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo', function(resp) {
data.dn = workaround(resp.beans[0]);
data.dn.HostName = resp.beans[0]['DatanodeHostname'];
@ -30,26 +30,6 @@
}).fail(show_err_msg);
}
function loadOzoneScmInfo() {
$.get('/jmx?qry=Hadoop:service=OzoneDataNode,name=SCMConnectionManager', function (resp) {
if (resp.beans.length > 0) {
data.ozone.SCMServers = resp.beans[0].SCMServers;
data.ozone.enabled = true;
render();
}
}).fail(show_err_msg);
}
function loadOzoneStorageInfo() {
$.get('/jmx?qry=Hadoop:service=OzoneDataNode,name=ContainerLocationManager', function (resp) {
if (resp.beans.length > 0) {
data.ozone.LocationReport = resp.beans[0].LocationReport;
data.ozone.enabled = true;
render();
}
}).fail(show_err_msg);
}
function workaround(dn) {
function node_map_to_array(nodes) {
var res = [];
@ -85,8 +65,6 @@
$('#alert-panel').show();
}
loadDatanodeInfo();
loadOzoneScmInfo();
loadOzoneStorageInfo();
load();
})();

View File

@ -38,10 +38,7 @@
'fmt_percentage': function (v) {
return Math.round(v * 100) / 100 + '%';
},
'elapsed': function(v) {
//elapsed sec from epoch sec
return Date.now() - v * 1000;
},
'fmt_time': function (v) {
var s = Math.floor(v / 1000), h = Math.floor(s / 3600);
s -= h * 3600;

View File

@ -52,10 +52,8 @@ public class TestNameNodeHttpServer {
@Parameters
public static Collection<Object[]> policy() {
Object[][] params = new Object[][] {
{HttpConfig.Policy.HTTP_ONLY },
{HttpConfig.Policy.HTTPS_ONLY },
{HttpConfig.Policy.HTTP_AND_HTTPS } };
Object[][] params = new Object[][] { { HttpConfig.Policy.HTTP_ONLY },
{ HttpConfig.Policy.HTTPS_ONLY }, { HttpConfig.Policy.HTTP_AND_HTTPS } };
return Arrays.asList(params);
}
@ -118,9 +116,8 @@ public void testHttpPolicy() throws Exception {
}
private static boolean canAccess(String scheme, InetSocketAddress addr) {
if (addr == null) {
if (addr == null)
return false;
}
try {
URL url = new URL(scheme + "://" + NetUtils.getHostPortString(addr));
URLConnection conn = connectionFactory.openConnection(url);

View File

@ -442,13 +442,13 @@ public void testPipelineRecoveryStress() throws Exception {
};
for (String[] scmd: scmds) {
String scmdStr = StringUtils.join(" ", scmd);
String scmd_str = StringUtils.join(" ", scmd);
try {
ShellCommandExecutor sce = new ShellCommandExecutor(scmd);
sce.execute();
LOG.info("'" + scmdStr + "' output:\n" + sce.getOutput());
LOG.info("'" + scmd_str + "' output:\n" + sce.getOutput());
} catch (IOException e) {
LOG.warn("Error when running '" + scmdStr + "'", e);
LOG.warn("Error when running '" + scmd_str + "'", e);
}
}

View File

@ -1,221 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/**
* Test Endpoint class.
*/
public final class SCMTestUtils {
/**
* Never constructed.
*/
private SCMTestUtils() {
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param handlerCount RPC server handler count
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(Configuration conf,
InetSocketAddress addr, Class<?>
protocol, BlockingService instance, int handlerCount)
throws IOException {
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* Creates an Endpoint class for testing purpose.
*
* @param conf - Conf
* @param address - InetAddres
* @param rpcTimeout - rpcTimeOut
* @return EndPoint
* @throws Exception
*/
public static EndpointStateMachine createEndpoint(Configuration conf,
InetSocketAddress address, int rpcTimeout) throws Exception {
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
long version =
RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
StorageContainerDatanodeProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout,
RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy();
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
return new EndpointStateMachine(address, rpcClient, conf);
}
/**
* Start Datanode RPC server.
*/
public static RPC.Server startScmRpcServer(Configuration configuration,
StorageContainerDatanodeProtocol server,
InetSocketAddress rpcServerAddresss, int handlerCount) throws
IOException {
RPC.setProtocolEngine(configuration,
StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService scmDatanodeService =
StorageContainerDatanodeProtocolService.
newReflectiveBlockingService(
new StorageContainerDatanodeProtocolServerSideTranslatorPB(
server));
RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
StorageContainerDatanodeProtocolPB.class, scmDatanodeService,
handlerCount);
scmServer.start();
return scmServer;
}
public static InetSocketAddress getReuseableAddress() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
int port = socket.getLocalPort();
String addr = InetAddress.getLoopbackAddress().getHostAddress();
return new InetSocketAddress(addr, port);
}
}
public static Configuration getConf() {
return new Configuration();
}
public static OzoneConfiguration getOzoneConf() {
return new OzoneConfiguration();
}
public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
return getDatanodeID(nodeManager, UUID.randomUUID().toString());
}
/**
* Create a new DatanodeID with NodeID set to the string.
*
* @param uuid - node ID, it is generally UUID.
* @return DatanodeID.
*/
public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, String
uuid) {
DatanodeID tempDataNode = getDatanodeID(uuid);
RegisteredCommand command =
(RegisteredCommand) nodeManager.register(tempDataNode);
return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
}
/**
* Get specified number of datanode IDs and registered them with node manager.
* @param nodeManager - node manager to register the datanode ids.
* @param count - number of datanode IDs needed.
* @return
*/
public static List<DatanodeID> getRegisteredDatanodeIDs(
SCMNodeManager nodeManager, int count) {
ArrayList<DatanodeID> datanodes = new ArrayList<>();
for (int i = 0; i < count; i++) {
datanodes.add(getDatanodeID(nodeManager));
}
return datanodes;
}
/**
* Get specified number of datanode IDs.
* @param count - number of datanode IDs needed.
* @return
*/
public static List<DatanodeID> getDatanodeIDs(int count) {
ArrayList<DatanodeID> datanodes = new ArrayList<>();
for (int i = 0; i < count; i++) {
datanodes.add(getDatanodeID());
}
return datanodes;
}
/**
* Get a datanode ID.
*
* @return DatanodeID
*/
public static DatanodeID getDatanodeID() {
return getDatanodeID(UUID.randomUUID().toString());
}
private static DatanodeID getDatanodeID(String uuid) {
Random random = new Random();
String ipAddress = random.nextInt(256) + "."
+ random.nextInt(256) + "."
+ random.nextInt(256) + "."
+ random.nextInt(256);
String hostName = uuid;
return new DatanodeID(ipAddress,
hostName, uuid, 0, 0, 0, 0);
}
}

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl</artifactId>
<version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>hadoop-hdsl-client</artifactId>
<version>3.2.0-SNAPSHOT</version>
<description>Apache Hadoop HDSL Client libraries</description>
<name>Apache Hadoop HDSL Client</name>
<packaging>jar</packaging>
<properties>
<hadoop.component>hdsl</hadoop.component>
<is.hadoop.component>true</is.hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdsl-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.ScmConfigKeys;
import com.google.common.base.Preconditions;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility methods for Ozone and Container Clients.
*
* The methods to retrieve SCM service endpoints assume there is a single
* SCM service instance. This will change when we switch to replicated service
* instances for redundancy.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class OzoneClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(
OzoneClientUtils.class);
private static final int NO_PORT = -1;
private OzoneClientUtils() {
}
/**
* Date format that used in ozone. Here the format is thread safe to use.
*/
private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT =
ThreadLocal.withInitial(() -> {
DateTimeFormatter format =
DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT);
return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE));
});
/**
* Returns the cache value to be used for list calls.
* @param conf Configuration object
* @return list cache size
*/
public static int getListCacheSize(Configuration conf) {
return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
}
/**
* @return a default instance of {@link CloseableHttpClient}.
*/
public static CloseableHttpClient newHttpClient() {
return OzoneClientUtils.newHttpClient(new OzoneConfiguration());
}
/**
* Returns a {@link CloseableHttpClient} configured by given configuration.
* If conf is null, returns a default instance.
*
* @param conf configuration
* @return a {@link CloseableHttpClient} instance.
*/
public static CloseableHttpClient newHttpClient(Configuration conf) {
long socketTimeout = OzoneConfigKeys
.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT;
long connectionTimeout = OzoneConfigKeys
.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT;
if (conf != null) {
socketTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
connectionTimeout = conf.getTimeDuration(
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
}
CloseableHttpClient client = HttpClients.custom()
.setDefaultRequestConfig(
RequestConfig.custom()
.setSocketTimeout(Math.toIntExact(socketTimeout))
.setConnectTimeout(Math.toIntExact(connectionTimeout))
.build())
.build();
return client;
}
/**
* verifies that bucket name / volume name is a valid DNS name.
*
* @param resName Bucket or volume Name to be validated
*
* @throws IllegalArgumentException
*/
public static void verifyResourceName(String resName)
throws IllegalArgumentException {
if (resName == null) {
throw new IllegalArgumentException("Bucket or Volume name is null");
}
if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) ||
(resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) {
throw new IllegalArgumentException(
"Bucket or Volume length is illegal, " +
"valid length is 3-63 characters");
}
if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot start with a period or dash");
}
if ((resName.charAt(resName.length() - 1) == '.') ||
(resName.charAt(resName.length() - 1) == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot end with a period or dash");
}
boolean isIPv4 = true;
char prev = (char) 0;
for (int index = 0; index < resName.length(); index++) {
char currChar = resName.charAt(index);
if (currChar != '.') {
isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4;
}
if (currChar > 'A' && currChar < 'Z') {
throw new IllegalArgumentException(
"Bucket or Volume name does not support uppercase characters");
}
if ((currChar != '.') && (currChar != '-')) {
if ((currChar < '0') || (currChar > '9' && currChar < 'a') ||
(currChar > 'z')) {
throw new IllegalArgumentException("Bucket or Volume name has an " +
"unsupported character : " +
currChar);
}
}
if ((prev == '.') && (currChar == '.')) {
throw new IllegalArgumentException("Bucket or Volume name should not " +
"have two contiguous periods");
}
if ((prev == '-') && (currChar == '.')) {
throw new IllegalArgumentException(
"Bucket or Volume name should not have period after dash");
}
if ((prev == '.') && (currChar == '-')) {
throw new IllegalArgumentException(
"Bucket or Volume name should not have dash after period");
}
prev = currChar;
}
if (isIPv4) {
throw new IllegalArgumentException(
"Bucket or Volume name cannot be an IPv4 address or all numeric");
}
}
/**
* Convert time in millisecond to a human readable format required in ozone.
* @return a human readable string for the input time
*/
public static String formatDateTime(long millis) {
ZonedDateTime dateTime = ZonedDateTime.ofInstant(
Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone());
return DATE_FORMAT.get().format(dateTime);
}
/**
* Convert time in ozone date format to millisecond.
* @return time in milliseconds
*/
public static long formatDateTime(String date) throws ParseException {
Preconditions.checkNotNull(date, "Date string should not be null.");
return ZonedDateTime.parse(date, DATE_FORMAT.get())
.toInstant().getEpochSecond();
}
/**
* Returns the maximum no of outstanding async requests to be handled by
* Standalone and Ratis client.
*/
public static int getMaxOutstandingRequests(Configuration config) {
return config
.getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client;
/**
* Generic helper classes for the client side of hdsl workflows..
*/

View File

@ -28,14 +28,14 @@
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -186,7 +186,7 @@ public void createPipeline(String pipelineID, List<DatanodeID> datanodes)
* @return - Stand Alone as the type.
*/
@Override
public OzoneProtos.ReplicationType getPipelineType() {
return OzoneProtos.ReplicationType.STAND_ALONE;
public HdslProtos.ReplicationType getPipelineType() {
return HdslProtos.ReplicationType.STAND_ALONE;
}
}

View File

@ -23,8 +23,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;

View File

@ -24,7 +24,7 @@
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.util.concurrent.Semaphore;

View File

@ -31,7 +31,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import static org.apache.hadoop.scm.ScmConfigKeys
@ -42,7 +42,7 @@
.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos
.ReplicationType.RATIS;
/**
@ -186,24 +186,24 @@ public boolean isUseRatis() {
* Returns hard coded 3 as replication factor.
* @return 3
*/
public OzoneProtos.ReplicationFactor getFactor() {
public HdslProtos.ReplicationFactor getFactor() {
if(isUseRatis()) {
return OzoneProtos.ReplicationFactor.THREE;
return HdslProtos.ReplicationFactor.THREE;
}
return OzoneProtos.ReplicationFactor.ONE;
return HdslProtos.ReplicationFactor.ONE;
}
/**
* Returns the default replication type.
* @return Ratis or Standalone
*/
public OzoneProtos.ReplicationType getType() {
public HdslProtos.ReplicationType getType() {
// TODO : Fix me and make Ratis default before release.
// TODO: Remove this as replication factor and type are pipeline properties
if(isUseRatis()) {
return OzoneProtos.ReplicationType.RATIS;
return HdslProtos.ReplicationType.RATIS;
}
return OzoneProtos.ReplicationType.STAND_ALONE;
return HdslProtos.ReplicationType.STAND_ALONE;
}
/**

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.scm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;

View File

@ -21,11 +21,11 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
@ -98,8 +98,8 @@ public void createPipeline(String clusterId, List<DatanodeID> datanodes)
* @return - Ratis
*/
@Override
public OzoneProtos.ReplicationType getPipelineType() {
return OzoneProtos.ReplicationType.RATIS;
public HdslProtos.ReplicationType getPipelineType() {
return HdslProtos.ReplicationType.RATIS;
}
private void reinitialize(List<DatanodeID> datanodes, RaftGroup group)

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.scm.client;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
import org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
@ -37,8 +37,8 @@
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.ALLOCATED;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.ALLOCATED;
import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.OPEN;
/**
* This class provides the client-facing APIs of container operations.
@ -189,8 +189,8 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
* @inheritDoc
*/
@Override
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor,
public Pipeline createContainer(HdslProtos.ReplicationType type,
HdslProtos.ReplicationFactor factor,
String containerId, String owner) throws IOException {
XceiverClientSpi client = null;
try {
@ -229,8 +229,8 @@ public Pipeline createContainer(OzoneProtos.ReplicationType type,
* @throws IOException
*/
@Override
public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
nodeStatuses, OzoneProtos.QueryScope queryScope, String poolName)
public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState>
nodeStatuses, HdslProtos.QueryScope queryScope, String poolName)
throws IOException {
return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
poolName);
@ -240,8 +240,8 @@ public OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState>
* Creates a specified replication pipeline.
*/
@Override
public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type,
HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool)
throws IOException {
return storageContainerLocationClient.createReplicationPipeline(type,
factor, nodePool);

Some files were not shown because too many files have changed in this diff Show More