HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal Nandi.
This commit is contained in:
parent
8c8cb2d6aa
commit
1b041d4fd4
@ -18,9 +18,8 @@
|
||||
"""This module has apis to create and remove a blockade cluster"""
|
||||
|
||||
from subprocess import call
|
||||
import subprocess
|
||||
import logging
|
||||
import random
|
||||
import util
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -39,23 +38,13 @@ def blockade_up(cls):
|
||||
|
||||
@classmethod
|
||||
def blockade_status(cls):
|
||||
exit_code, output = ClusterUtils.run_cmd("blockade status")
|
||||
exit_code, output = util.run_cmd("blockade status")
|
||||
return exit_code, output
|
||||
|
||||
@classmethod
|
||||
def make_flaky(cls, flaky_node, container_list):
|
||||
# make the network flaky
|
||||
om, scm, _, datanodes = \
|
||||
ClusterUtils.find_om_scm_client_datanodes(container_list)
|
||||
node_dict = {
|
||||
"all": "--all",
|
||||
"scm" : scm[0],
|
||||
"om" : om[0],
|
||||
"datanode": random.choice(datanodes)
|
||||
}[flaky_node]
|
||||
logger.info("flaky node: %s", node_dict)
|
||||
|
||||
output = call(["blockade", "flaky", node_dict])
|
||||
def make_flaky(cls, flaky_node):
|
||||
logger.info("flaky node: %s", flaky_node)
|
||||
output = call(["blockade", "flaky", flaky_node])
|
||||
assert output == 0, "flaky command failed with exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
@ -69,7 +58,7 @@ def blockade_create_partition(cls, *args):
|
||||
for node_list in args:
|
||||
nodes = nodes + ','.join(node_list) + " "
|
||||
exit_code, output = \
|
||||
ClusterUtils.run_cmd("blockade partition %s" % nodes)
|
||||
util.run_cmd("blockade partition %s" % nodes)
|
||||
assert exit_code == 0, \
|
||||
"blockade partition command failed with exit code=[%s]" % output
|
||||
|
||||
@ -96,3 +85,8 @@ def blockade_start(cls, node, all_nodes=False):
|
||||
output = call(["blockade", "start", node])
|
||||
assert output == 0, "blockade start command failed with " \
|
||||
"exit code=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def blockade_add(cls, node):
|
||||
output = call(["blockade", "add", node])
|
||||
assert output == 0, "blockade add command failed"
|
@ -17,6 +17,7 @@
|
||||
|
||||
|
||||
from subprocess import call
|
||||
|
||||
import subprocess
|
||||
import logging
|
||||
import time
|
||||
@ -292,9 +293,15 @@ def get_key(cls, docker_compose_file, bucket_name, volume_name,
|
||||
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
|
||||
|
||||
@classmethod
|
||||
def find_checksum(cls, docker_compose_file, filepath):
|
||||
def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"):
|
||||
"""
|
||||
This function finds the checksum of a file present in a docker container.
|
||||
Before running any 'putKey' operation, this function is called to store
|
||||
the original checksum of the file. The file is then uploaded as a key.
|
||||
"""
|
||||
command = "docker-compose -f %s " \
|
||||
"exec ozone_client md5sum %s" % (docker_compose_file, filepath)
|
||||
"exec %s md5sum %s" % \
|
||||
(docker_compose_file, client, filepath)
|
||||
exit_code, output = cls.run_cmd(command)
|
||||
assert exit_code == 0, "Cant find checksum"
|
||||
myoutput = output.split("\n")
|
||||
|
14
hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
vendored
Normal file
14
hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
295
hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
vendored
Normal file
295
hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
vendored
Normal file
@ -0,0 +1,295 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import yaml
|
||||
import util
|
||||
from subprocess import call
|
||||
from blockadeUtils.blockade import Blockade
|
||||
|
||||
|
||||
class Command(object):
|
||||
docker = "docker"
|
||||
blockade = "blockade"
|
||||
docker_compose = "docker-compose"
|
||||
ozone = "/opt/hadoop/bin/ozone"
|
||||
freon = "/opt/hadoop/bin/ozone freon"
|
||||
|
||||
|
||||
class Configuration:
|
||||
"""
|
||||
Configurations to be used while starting Ozone Cluster.
|
||||
Here @property decorators is used to achieve getters, setters and delete
|
||||
behaviour for 'datanode_count' attribute.
|
||||
@datanode_count.setter will set the value for 'datanode_count' attribute.
|
||||
@datanode_count.deleter will delete the current value of 'datanode_count'
|
||||
attribute.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
__parent_dir__ = os.path.dirname(os.path.dirname(
|
||||
os.path.dirname(os.path.realpath(__file__))))
|
||||
self.docker_compose_file = os.path.join(__parent_dir__,
|
||||
"compose", "ozoneblockade",
|
||||
"docker-compose.yaml")
|
||||
self._datanode_count = 3
|
||||
os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
|
||||
|
||||
@property
|
||||
def datanode_count(self):
|
||||
return self._datanode_count
|
||||
|
||||
@datanode_count.setter
|
||||
def datanode_count(self, datanode_count):
|
||||
self._datanode_count = datanode_count
|
||||
|
||||
@datanode_count.deleter
|
||||
def datanode_count(self):
|
||||
del self._datanode_count
|
||||
|
||||
|
||||
class Cluster(object):
|
||||
"""
|
||||
This represents Ozone Cluster.
|
||||
Here @property decorators is used to achieve getters, setters and delete
|
||||
behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes.
|
||||
"""
|
||||
|
||||
__logger__ = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.docker_compose_file = conf.docker_compose_file
|
||||
self._om = None
|
||||
self._scm = None
|
||||
self._datanodes = None
|
||||
self._clients = None
|
||||
self.scm_uuid = None
|
||||
self.datanode_dir = None
|
||||
|
||||
@property
|
||||
def om(self):
|
||||
return self._om
|
||||
|
||||
@om.setter
|
||||
def om(self, om):
|
||||
self._om = om
|
||||
|
||||
@om.deleter
|
||||
def om(self):
|
||||
del self._om
|
||||
|
||||
@property
|
||||
def scm(self):
|
||||
return self._scm
|
||||
|
||||
@scm.setter
|
||||
def scm(self, scm):
|
||||
self._scm = scm
|
||||
|
||||
@scm.deleter
|
||||
def scm(self):
|
||||
del self._scm
|
||||
|
||||
@property
|
||||
def datanodes(self):
|
||||
return self._datanodes
|
||||
|
||||
@datanodes.setter
|
||||
def datanodes(self, datanodes):
|
||||
self._datanodes = datanodes
|
||||
|
||||
@datanodes.deleter
|
||||
def datanodes(self):
|
||||
del self._datanodes
|
||||
|
||||
@property
|
||||
def clients(self):
|
||||
return self._clients
|
||||
|
||||
@clients.setter
|
||||
def clients(self, clients):
|
||||
self._clients = clients
|
||||
|
||||
@clients.deleter
|
||||
def clients(self):
|
||||
del self._clients
|
||||
|
||||
@classmethod
|
||||
def create(cls, config=Configuration()):
|
||||
return Cluster(config)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start Ozone Cluster in docker containers.
|
||||
"""
|
||||
Cluster.__logger__.info("Starting Ozone Cluster")
|
||||
Blockade.blockade_destroy()
|
||||
call([Command.docker_compose, "-f", self.docker_compose_file,
|
||||
"up", "-d", "--scale",
|
||||
"datanode=" + str(self.conf.datanode_count)])
|
||||
Cluster.__logger__.info("Waiting 10s for cluster start up...")
|
||||
# Remove the sleep and wait only till the cluster is out of safemode
|
||||
# time.sleep(10)
|
||||
output = subprocess.check_output([Command.docker_compose, "-f",
|
||||
self.docker_compose_file, "ps"])
|
||||
node_list = []
|
||||
for out in output.split("\n")[2:-1]:
|
||||
node = out.split(" ")[0]
|
||||
node_list.append(node)
|
||||
Blockade.blockade_add(node)
|
||||
|
||||
Blockade.blockade_status()
|
||||
self.om = filter(lambda x: 'om' in x, node_list)[0]
|
||||
self.scm = filter(lambda x: 'scm' in x, node_list)[0]
|
||||
self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
|
||||
self.clients = filter(lambda x: 'ozone_client' in x, node_list)
|
||||
self.scm_uuid = self.__get_scm_uuid__()
|
||||
self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
|
||||
|
||||
assert node_list, "no node found in the cluster!"
|
||||
Cluster.__logger__.info("blockade created with nodes %s",
|
||||
' '.join(node_list))
|
||||
|
||||
def get_conf_value(self, key):
|
||||
"""
|
||||
Returns the value of given configuration key.
|
||||
"""
|
||||
command = [Command.ozone, "getconf -confKey " + key]
|
||||
exit_code, output = self.__run_docker_command__(command, self.om)
|
||||
return str(output).strip()
|
||||
|
||||
def scale_datanode(self, datanode_count):
|
||||
"""
|
||||
Commission new datanodes to the running cluster.
|
||||
"""
|
||||
call([Command.docker_compose, "-f", self.docker_compose_file,
|
||||
"up", "-d", "--scale", "datanode=" + datanode_count])
|
||||
|
||||
def partition_network(self, *args):
|
||||
"""
|
||||
Partition the network which is used by the cluster.
|
||||
"""
|
||||
Blockade.blockade_create_partition(*args)
|
||||
|
||||
|
||||
def restore_network(self):
|
||||
"""
|
||||
Restores the network partition.
|
||||
"""
|
||||
Blockade.blockade_join()
|
||||
|
||||
|
||||
def __get_scm_uuid__(self):
|
||||
"""
|
||||
Returns SCM's UUID.
|
||||
"""
|
||||
ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
|
||||
command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
|
||||
exit_code, output = self.__run_docker_command__(command, self.scm)
|
||||
output_list = output.split("\n")
|
||||
key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
|
||||
uuid = [token for token in key_value if 'scmUuid' in token]
|
||||
return uuid.pop().split("=")[1].strip()
|
||||
|
||||
def get_container_states(self, datanode):
|
||||
"""
|
||||
Returns the state of all the containers in the given datanode.
|
||||
"""
|
||||
container_parent_path = "%s/hdds/%s/current/containerDir0" % \
|
||||
(self.datanode_dir, self.scm_uuid)
|
||||
command = "find %s -type f -name '*.container'" % container_parent_path
|
||||
exit_code, output = self.__run_docker_command__(command, datanode)
|
||||
container_state = {}
|
||||
|
||||
container_list = map(str.strip, output.split("\n"))
|
||||
for container_path in container_list:
|
||||
# Reading the container file.
|
||||
exit_code, output = self.__run_docker_command__(
|
||||
"cat " + container_path, datanode)
|
||||
if exit_code is not 0:
|
||||
continue
|
||||
data = output.split("\n")
|
||||
# Reading key value pairs from container file.
|
||||
key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
|
||||
content = "\n".join(key_value)
|
||||
content_yaml = yaml.load(content)
|
||||
if content_yaml is None:
|
||||
continue
|
||||
for key, value in content_yaml.items():
|
||||
content_yaml[key] = str(value).lstrip()
|
||||
# Stores the container state in a dictionary.
|
||||
container_state[content_yaml['containerID']] = content_yaml['state']
|
||||
return container_state
|
||||
|
||||
def run_freon(self, num_volumes, num_buckets, num_keys, key_size,
|
||||
replication_type="RATIS", replication_factor="THREE",
|
||||
run_on=None):
|
||||
"""
|
||||
Runs freon on the cluster.
|
||||
"""
|
||||
if run_on is None:
|
||||
run_on = self.om
|
||||
command = [Command.freon,
|
||||
" rk",
|
||||
" --numOfVolumes " + str(num_volumes),
|
||||
" --numOfBuckets " + str(num_buckets),
|
||||
" --numOfKeys " + str(num_keys),
|
||||
" --keySize " + str(key_size),
|
||||
" --replicationType " + replication_type,
|
||||
" --factor " + replication_factor]
|
||||
return self.__run_docker_command__(command, run_on)
|
||||
|
||||
def __run_docker_command__(self, command, run_on):
|
||||
if isinstance(command, list):
|
||||
command = ' '.join(command)
|
||||
command = [Command.docker,
|
||||
"exec " + run_on,
|
||||
command]
|
||||
return util.run_cmd(command)
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stops the Ozone Cluster.
|
||||
"""
|
||||
Cluster.__logger__.info("Stopping Ozone Cluster")
|
||||
call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
|
||||
Blockade.blockade_destroy()
|
||||
|
||||
def container_state_predicate_all_closed(self, datanodes):
|
||||
for datanode in datanodes:
|
||||
container_states_dn = self.get_container_states(datanode)
|
||||
if not container_states_dn \
|
||||
or container_states_dn.popitem()[1] != 'CLOSED':
|
||||
return False
|
||||
return True
|
||||
|
||||
def container_state_predicate_one_closed(self, datanodes):
|
||||
for datanode in datanodes:
|
||||
container_states_dn = self.get_container_states(datanode)
|
||||
if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED':
|
||||
return True
|
||||
return False
|
||||
|
||||
def container_state_predicate(self, datanode, state):
|
||||
container_states_dn = self.get_container_states(datanode)
|
||||
if container_states_dn and container_states_dn.popitem()[1] == state:
|
||||
return True
|
||||
return False
|
@ -16,132 +16,123 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
from blockadeUtils.blockade import Blockade
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
import util
|
||||
from ozone.cluster import Cluster
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
|
||||
"docker-compose.yaml")
|
||||
os.environ["DOCKER_COMPOSE_FILE"] = FILE
|
||||
SCALE = 3
|
||||
INCREASED_SCALE = 5
|
||||
CONTAINER_LIST = []
|
||||
OM = []
|
||||
SCM = []
|
||||
DATANODES = []
|
||||
|
||||
|
||||
def setup():
|
||||
global CONTAINER_LIST, OM, SCM, DATANODES
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
OM, SCM, _, DATANODES = \
|
||||
ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
|
||||
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
||||
def setup_function(function):
|
||||
global cluster
|
||||
cluster = Cluster.create()
|
||||
cluster.start()
|
||||
|
||||
|
||||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_destroy()
|
||||
def teardown_function(function):
|
||||
cluster.stop()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
|
||||
|
||||
def test_isolatedatanode_singlenode(run_second_phase):
|
||||
def test_isolate_single_datanode():
|
||||
"""
|
||||
In this test, one of the datanodes (first datanode) cannot communicate
|
||||
with other two datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation :
|
||||
The container replica state in first datanode should be quasi-closed.
|
||||
The container replica state in other datanodes should be closed.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
first_datanode_status = all_datanodes_container_status[0]
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert first_datanode_status == 'QUASI_CLOSED'
|
||||
assert len(closed_container_datanodes) == 2, \
|
||||
"The container should have two closed replicas."
|
||||
In this test case we will create a network partition in such a way that
|
||||
one of the datanode will not be able to communicate with other datanodes
|
||||
but it will be able to communicate with SCM.
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3, \
|
||||
"The container should have at least three closed replicas."
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
||||
Once the network partition happens, SCM detects it and closes the pipeline,
|
||||
which in-turn closes the containers.
|
||||
|
||||
The container on the first two datanode will get CLOSED as they have quorum.
|
||||
The container replica on the third node will be QUASI_CLOSED as it is not
|
||||
able to connect with the other datanodes and it doesn't have latest BCSID.
|
||||
|
||||
def test_datanode_isolation_all(run_second_phase):
|
||||
"""
|
||||
In this test, none of the datanodes can communicate with other two
|
||||
Once we restore the network, the stale replica on the third datanode will be
|
||||
deleted and a latest replica will be copied from any one of the other
|
||||
datanodes.
|
||||
All datanodes can communicate with SCM.
|
||||
Expectation : The container should eventually have at least two closed
|
||||
replicas.
|
||||
"""
|
||||
first_set = [OM[0], SCM[0], DATANODES[0]]
|
||||
second_set = [OM[0], SCM[0], DATANODES[1]]
|
||||
third_set = [OM[0], SCM[0], DATANODES[2]]
|
||||
Blockade.blockade_create_partition(first_set, second_set, third_set)
|
||||
Blockade.blockade_status()
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 2, \
|
||||
"The container should have at least two closed replicas."
|
||||
|
||||
if str(run_second_phase).lower() == "true":
|
||||
ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
|
||||
Blockade.blockade_status()
|
||||
logger.info("Waiting for %s seconds before checking container status",
|
||||
os.environ["CONTAINER_STATUS_SLEEP"])
|
||||
time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
|
||||
all_datanodes_container_status = \
|
||||
ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
|
||||
closed_container_datanodes = [x for x in all_datanodes_container_status
|
||||
if x == 'CLOSED']
|
||||
assert len(closed_container_datanodes) >= 3, \
|
||||
"The container should have at least three closed replicas."
|
||||
Blockade.blockade_join()
|
||||
Blockade.blockade_status()
|
||||
_, output = \
|
||||
ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
|
||||
assert re.search("Status: Success", output) is not None
|
||||
"""
|
||||
cluster.run_freon(1, 1, 1, 10240)
|
||||
first_set = [cluster.om, cluster.scm,
|
||||
cluster.datanodes[0], cluster.datanodes[1]]
|
||||
second_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
|
||||
logger.info("Partitioning the network")
|
||||
cluster.partition_network(first_set, second_set)
|
||||
cluster.run_freon(1, 1, 1, 10240)
|
||||
logger.info("Waiting for container to be QUASI_CLOSED")
|
||||
|
||||
util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2])
|
||||
.popitem()[1] == 'QUASI_CLOSED',
|
||||
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
|
||||
container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0])
|
||||
container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1])
|
||||
container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
|
||||
assert len(container_states_dn_0) != 0
|
||||
assert len(container_states_dn_1) != 0
|
||||
assert len(container_states_dn_2) != 0
|
||||
for key in container_states_dn_0:
|
||||
assert container_states_dn_0.get(key) == 'CLOSED'
|
||||
for key in container_states_dn_1:
|
||||
assert container_states_dn_1.get(key) == 'CLOSED'
|
||||
for key in container_states_dn_2:
|
||||
assert container_states_dn_2.get(key) == 'QUASI_CLOSED'
|
||||
|
||||
# Since the replica in datanode[2] doesn't have the latest BCSID,
|
||||
# ReplicationManager will delete it and copy a closed replica.
|
||||
# We will now restore the network and datanode[2] should get a
|
||||
# closed replica of the container
|
||||
logger.info("Restoring the network")
|
||||
cluster.restore_network()
|
||||
|
||||
logger.info("Waiting for the replica to be CLOSED")
|
||||
util.wait_until(
|
||||
lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'),
|
||||
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
|
||||
container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
|
||||
assert len(container_states_dn_2) != 0
|
||||
for key in container_states_dn_2:
|
||||
assert container_states_dn_2.get(key) == 'CLOSED'
|
||||
|
||||
|
||||
def test_datanode_isolation_all():
|
||||
"""
|
||||
In this test case we will create a network partition in such a way that
|
||||
all datanodes cannot communicate with each other.
|
||||
All datanodes will be able to communicate with SCM.
|
||||
|
||||
Once the network partition happens, SCM detects it and closes the pipeline,
|
||||
which in-turn tries to close the containers.
|
||||
At least one of the replica should be in closed state
|
||||
|
||||
Once we restore the network, there will be three closed replicas.
|
||||
|
||||
"""
|
||||
cluster.run_freon(1, 1, 1, 10240)
|
||||
|
||||
assert len(cluster.get_container_states(cluster.datanodes[0])) != 0
|
||||
assert len(cluster.get_container_states(cluster.datanodes[1])) != 0
|
||||
assert len(cluster.get_container_states(cluster.datanodes[2])) != 0
|
||||
|
||||
logger.info("Partitioning the network")
|
||||
first_set = [cluster.om, cluster.scm, cluster.datanodes[0]]
|
||||
second_set = [cluster.om, cluster.scm, cluster.datanodes[1]]
|
||||
third_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
|
||||
cluster.partition_network(first_set, second_set, third_set)
|
||||
|
||||
logger.info("Waiting for the replica to be CLOSED")
|
||||
util.wait_until(
|
||||
lambda: cluster.container_state_predicate_one_closed(cluster.datanodes),
|
||||
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
|
||||
|
||||
# At least one of the replica should be in closed state
|
||||
assert cluster.container_state_predicate_one_closed(cluster.datanodes)
|
||||
|
||||
# After restoring the network all the replicas should be in
|
||||
# CLOSED state
|
||||
logger.info("Restoring the network")
|
||||
cluster.restore_network()
|
||||
|
||||
logger.info("Waiting for the container to be replicated")
|
||||
util.wait_until(
|
||||
lambda: cluster.container_state_predicate_all_closed(cluster.datanodes),
|
||||
int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
|
||||
assert cluster.container_state_predicate_all_closed(cluster.datanodes)
|
@ -16,11 +16,11 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import random
|
||||
import pytest
|
||||
from blockadeUtils.blockade import Blockade
|
||||
from clusterUtils.cluster_utils import ClusterUtils
|
||||
from ozone.cluster import Cluster
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -32,30 +32,36 @@
|
||||
CONTAINER_LIST = []
|
||||
|
||||
|
||||
def setup_module():
|
||||
global CONTAINER_LIST
|
||||
Blockade.blockade_destroy()
|
||||
CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
|
||||
exit_code, output = Blockade.blockade_status()
|
||||
assert exit_code == 0, "blockade status command failed with output=[%s]" % \
|
||||
output
|
||||
def setup_function(function):
|
||||
global cluster
|
||||
cluster = Cluster.create()
|
||||
cluster.start()
|
||||
|
||||
|
||||
def teardown_module():
|
||||
Blockade.blockade_destroy()
|
||||
ClusterUtils.cluster_destroy(FILE)
|
||||
def teardown_function(function):
|
||||
cluster.stop()
|
||||
|
||||
|
||||
def teardown():
|
||||
logger.info("Inside teardown")
|
||||
Blockade.blockade_fast_all()
|
||||
time.sleep(5)
|
||||
@pytest.mark.parametrize("flaky_node", ["datanode", "scm", "om", "all"])
|
||||
def test_flaky(flaky_node):
|
||||
"""
|
||||
In these tests, we make the network of the nodes as flaky using blockade.
|
||||
There are 4 tests :
|
||||
1) one of the datanodes selected randomly and network of the datanode is
|
||||
made flaky.
|
||||
2) scm network is made flaky.
|
||||
3) om network is made flaky.
|
||||
4) Network of all the nodes are made flaky.
|
||||
|
||||
"""
|
||||
flaky_container_name = {
|
||||
"scm": cluster.scm,
|
||||
"om": cluster.om,
|
||||
"datanode": random.choice(cluster.datanodes),
|
||||
"all": "--all"
|
||||
}[flaky_node]
|
||||
|
||||
@pytest.mark.parametrize("flaky_nodes", ["datanode", "scm", "om", "all"])
|
||||
def test_flaky(flaky_nodes):
|
||||
Blockade.make_flaky(flaky_nodes, CONTAINER_LIST)
|
||||
Blockade.make_flaky(flaky_container_name)
|
||||
Blockade.blockade_status()
|
||||
exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
|
||||
"THREE")
|
||||
exit_code, output = cluster.run_freon(1, 1, 1, 10240)
|
||||
assert exit_code == 0, "freon run failed with output=[%s]" % output
|
52
hadoop-ozone/dist/src/main/blockade/util.py
vendored
Normal file
52
hadoop-ozone/dist/src/main/blockade/util.py
vendored
Normal file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def wait_until(predicate, timeout, check_frequency=1):
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
if predicate():
|
||||
return
|
||||
time.sleep(check_frequency)
|
||||
|
||||
|
||||
def run_cmd(cmd):
|
||||
command = cmd
|
||||
if isinstance(cmd, list):
|
||||
command = ' '.join(cmd)
|
||||
logger.info(" RUNNING: %s", command)
|
||||
all_output = ""
|
||||
my_process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, shell=True)
|
||||
while my_process.poll() is None:
|
||||
op = my_process.stdout.readline()
|
||||
if op:
|
||||
all_output += op
|
||||
logger.info(op)
|
||||
other_output = my_process.communicate()
|
||||
other_output = other_output[0].strip()
|
||||
if other_output != "":
|
||||
all_output += other_output
|
||||
reg = re.compile(r"(\r\n|\n)$")
|
||||
all_output = reg.sub("", all_output, 1)
|
||||
return my_process.returncode, all_output
|
@ -26,6 +26,12 @@ OZONE-SITE.XML_ozone.scm.client.address=scm
|
||||
OZONE-SITE.XML_ozone.scm.dead.node.interval=5m
|
||||
OZONE-SITE.XML_ozone.replication=1
|
||||
OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
|
||||
OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
|
||||
OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s
|
||||
OZONE-SITE.XML_hdds.heartbeat.interval=2s
|
||||
OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
|
||||
OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s
|
||||
OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s
|
||||
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
|
||||
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
|
||||
LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
|
||||
|
Loading…
Reference in New Issue
Block a user